You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/08/02 00:17:32 UTC

svn commit: r1509455 - in /pig/trunk: CHANGES.txt src/org/apache/pig/newplan/PColFilterExtractor.java test/org/apache/pig/test/TestPartitionFilterPushDown.java

Author: cheolsoo
Date: Thu Aug  1 22:17:32 2013
New Revision: 1509455

URL: http://svn.apache.org/r1509455
Log:
PIG-3395: Large filter expression makes Pig hang (cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
    pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1509455&r1=1509454&r2=1509455&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug  1 22:17:32 2013
@@ -200,6 +200,8 @@ PIG-2910: Add function to read schema fr
 
 OPTIMIZATIONS
 
+PIG-3395: Large filter expression makes Pig hang (cheolsoo)
+
 PIG-3123: Simplify Logical Plans By Removing Unneccessary Identity Projections (njw45 via cheolsoo)
 
 PIG-3013: BinInterSedes improve chararray sort performance (rohini)

Modified: pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1509455&r1=1509454&r2=1509455&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java Thu Aug  1 22:17:32 2013
@@ -68,417 +68,457 @@ public class PColFilterExtractor extends
 
     private static final Log LOG = LogFactory.getLog(PColFilterExtractor.class);
 
-	/**
-	 * partition columns associated with the table
-	 * present in the load on which the filter whose
-	 * inner plan is being visited is applied
-	 */
-	private List<String> partitionCols;
-
-	/**
-	 * will contain the partition column filter conditions
-	 * accumulated during the visit - the final condition will an expression
-	 * built from these sub expressions connected with AND
-	 */
-	private ArrayList<Expression> pColConditions = new ArrayList<Expression>();
-
-	/**
-	 * flag used during visit to indicate if a partition key
-	 * was seen
-	 */
-	private boolean sawKey;
+    /**
+     * partition columns associated with the table
+     * present in the load on which the filter whose
+     * inner plan is being visited is applied
+     */
+    private List<String> partitionCols;
+
+    /**
+     * will contain the partition column filter conditions
+     * accumulated during the visit - the final condition will an expression
+     * built from these sub expressions connected with AND
+     */
+    private ArrayList<Expression> pColConditions = new ArrayList<Expression>();
+
+    /**
+     * flag used during visit to indicate if a partition key
+     * was seen
+     */
+    private boolean sawKey;
 
-	private boolean sawNonKeyCol;
+    private boolean sawNonKeyCol;
 
-	private enum Side { LEFT, RIGHT, NONE };
-	private Side replaceSide = Side.NONE;
+    private enum Side { LEFT, RIGHT, NONE };
+    private Side replaceSide = Side.NONE;
 
-	private boolean filterRemovable = false;
+    private boolean filterRemovable = false;
 
     private boolean canPushDown = true;
 
-	@Override
-	public void visit() throws FrontendException {
-		// we will visit the leaf and it will recursively walk the plan
-		LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 );
-		// if the leaf is a unary operator it should be a FilterFunc in
-		// which case we don't try to extract partition filter conditions
-		if(leaf instanceof BinaryExpression) {
-			BinaryExpression binExpr = (BinaryExpression)leaf;
-			visit( binExpr );
-			replaceChild( binExpr );
-			// if the entire expression is to be removed, then the above
-			// replaceChild will not set sawKey to false (sawKey is set to
-			// false only in replaceChild()
-			if(sawKey == true) {
-				//there are only conditions on partition columns in the filter
-				//extract it
-				pColConditions.add( getExpression( leaf ) );
-				filterRemovable = true;
-			}
-		}
-	}
-
-	/**
-	 *
-	 * @param plan logical plan corresponding the filter's comparison condition
-	 * @param partitionCols list of partition columns of the table which is
-	 * being loaded in the LOAD statement which is input to the filter
-	 */
-	public PColFilterExtractor(OperatorPlan plan,
-			List<String> partitionCols) {
-		// though we configure a DepthFirstWalker to be the walker, we will not
-		// use it - we will visit the leaf and it will recursively walk the
-		// plan
-		super( plan, new DepthFirstWalker( plan ) );
-		this.partitionCols = new ArrayList<String>(partitionCols);
-	}
-
-	protected void visit(ProjectExpression project) throws FrontendException {
-		String fieldName = project.getFieldSchema().alias;
-		if(partitionCols.contains(fieldName)) {
-			sawKey = true;
-			// The condition on partition column will be used to prune the
-			// scan and removed from the filter condition. Hence the condition
-			// on the partition column will not be re applied when data is read,
-			// so the following cases should throw error until that changes.
-			List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>();
+    @Override
+    public void visit() throws FrontendException {
+        // we will visit the leaf and it will recursively walk the plan
+        LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 );
+        // if the leaf is a unary operator it should be a FilterFunc in
+        // which case we don't try to extract partition filter conditions
+        if(leaf instanceof BinaryExpression) {
+            BinaryExpression binExpr = (BinaryExpression)leaf;
+            visit( binExpr );
+            replaceChild( binExpr );
+            // if the entire expression is to be removed, then the above
+            // replaceChild will not set sawKey to false (sawKey is set to
+            // false only in replaceChild()
+            if(sawKey == true) {
+                //there are only conditions on partition columns in the filter
+                //extract it
+                pColConditions.add( getExpression( leaf ) );
+                filterRemovable = true;
+            }
+        }
+    }
+
+    /**
+     *
+     * @param plan logical plan corresponding the filter's comparison condition
+     * @param partitionCols list of partition columns of the table which is
+     * being loaded in the LOAD statement which is input to the filter
+     */
+    public PColFilterExtractor(OperatorPlan plan,
+            List<String> partitionCols) {
+        // though we configure a DepthFirstWalker to be the walker, we will not
+        // use it - we will visit the leaf and it will recursively walk the
+        // plan
+        super( plan, new DepthFirstWalker( plan ) );
+        this.partitionCols = new ArrayList<String>(partitionCols);
+    }
+
+    protected void visit(ProjectExpression project) throws FrontendException {
+        String fieldName = project.getFieldSchema().alias;
+        if(partitionCols.contains(fieldName)) {
+            sawKey = true;
+            // The condition on partition column will be used to prune the
+            // scan and removed from the filter condition. Hence the condition
+            // on the partition column will not be re applied when data is read,
+            // so the following cases should throw error until that changes.
+            List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>();
                         opsToCheckFor.add(UserFuncExpression.class);
-			if(checkSuccessors(project, opsToCheckFor)) {
-            LOG.warn("No partition filter push down: " +
-                "You have an partition column ("
-                + fieldName + ") inside a function in the " +
-                "filter condition.");
-            canPushDown = false;
-            return;
-			}
-			opsToCheckFor.set(0, CastExpression.class);
-			if(checkSuccessors(project, opsToCheckFor)) {
-            LOG.warn("No partition filter push down: " +
-                "You have an partition column ("
-                + fieldName + ") inside a cast in the " +
-                "filter condition.");
-            canPushDown = false;
+            if(checkSuccessors(project, opsToCheckFor)) {
+                LOG.warn("No partition filter push down: " +
+                    "You have an partition column ("
+                    + fieldName + ") inside a function in the " +
+                    "filter condition.");
+                canPushDown = false;
+                return;
+            }
+            opsToCheckFor.set(0, CastExpression.class);
+            if(checkSuccessors(project, opsToCheckFor)) {
+                LOG.warn("No partition filter push down: " +
+                    "You have an partition column ("
+                    + fieldName + ") inside a cast in the " +
+                    "filter condition.");
+                canPushDown = false;
+                return;
+            }
+            opsToCheckFor.set(0, IsNullExpression.class);
+            if(checkSuccessors(project, opsToCheckFor)) {
+                LOG.warn("No partition filter push down: " +
+                    "You have an partition column ("
+                    + fieldName + ") inside a null check operator in the " +
+                    "filter condition.");
+                canPushDown = false;
+                return;
+            }
+            opsToCheckFor.set(0, BinCondExpression.class);
+            if(checkSuccessors(project, opsToCheckFor)) {
+                LOG.warn("No partition filter push down: " +
+                    "You have an partition column ("
+                    + fieldName + ") inside a bincond operator in the " +
+                    "filter condition.");
+                canPushDown = false;
+                return;
+            }
+        } else {
+            sawNonKeyCol = true;
+        }
+    }
+
+    /**
+     * Detect whether a non-partition column is present in the expression.
+     * @param binOp
+     * @return true or false
+     * @throws FrontendException
+     */
+    private boolean detectNonPartitionColumn(BinaryExpression binOp) throws FrontendException {
+        LogicalExpression lhs = binOp.getLhs();
+        LogicalExpression rhs = binOp.getRhs();
+        if (lhs instanceof ProjectExpression) {
+            String fieldName = ((ProjectExpression)lhs).getFieldSchema().alias;
+            if(!partitionCols.contains(fieldName)) {
+                return true;
+            }
+        }
+        if (rhs instanceof ProjectExpression) {
+            String fieldName = ((ProjectExpression)rhs).getFieldSchema().alias;
+            if(!partitionCols.contains(fieldName)) {
+                return true;
+            }
+        }
+
+        boolean lhsSawNonKeyCol = false;
+        boolean rhsSawNonKeyCol = false;
+        if (lhs instanceof BinaryExpression) {
+            lhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)lhs);
+        }
+        if (rhs instanceof BinaryExpression) {
+            rhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)rhs);
+        }
+
+        return lhsSawNonKeyCol || rhsSawNonKeyCol;
+    }
+
+    /**
+     * Detect and/or expressions that contain both partition and non-partition
+     * conditions such as '(pcond and non-pcond) or (pcond and non-pcond)'.
+     * @param binOp
+     * @return true or false
+     * @throws FrontendException
+     */
+    private boolean detectAndOrConditionWithMixedColumns(BinaryExpression binOp) throws FrontendException {
+        LogicalExpression lhs = binOp.getLhs();
+        LogicalExpression rhs = binOp.getRhs();
+
+        if ( (binOp instanceof OrExpression) &&
+             ( (lhs instanceof AndExpression && rhs instanceof AndExpression) ||
+               (lhs instanceof OrExpression || rhs instanceof OrExpression) ) ) {
+            return detectNonPartitionColumn(binOp);
+        }
+
+        return false;
+    }
+
+    private void visit(BinaryExpression binOp) throws FrontendException {
+        boolean lhsSawKey = false;
+        boolean rhsSawKey = false;
+        boolean lhsSawNonKeyCol = false;
+        boolean rhsSawNonKeyCol = false;
+        sawKey = false;
+        sawNonKeyCol = false;
+
+        if (detectAndOrConditionWithMixedColumns(binOp)) {
+            sawNonKeyCol = true;
+            // Don't set canPushDown to false. If there are other AND
+            // conditions on a partition column we want to push that down
+            LOG.warn("No partition filter push down: You have partition and non-partition "
+                    + "columns  in a construction like: "
+                    + "(pcond and non-pcond ..) or (pcond and non-pcond ...) "
+                    + "where pcond is a condition on a partition column and "
+                    + "non-pcond is a condition on a non-partition column.");
             return;
-			}
-			opsToCheckFor.set(0, IsNullExpression.class);
-			if(checkSuccessors(project, opsToCheckFor)) {
+        }
+
+        visit( binOp.getLhs() );
+        replaceChild(binOp.getLhs());
+        lhsSawKey = sawKey;
+        lhsSawNonKeyCol = sawNonKeyCol;
+
+        sawKey = false;
+        sawNonKeyCol = false;
+        visit( binOp.getRhs() );
+        replaceChild(binOp.getRhs());
+        rhsSawKey = sawKey;
+        rhsSawNonKeyCol = sawNonKeyCol;
+
+        // only in the case of an AND, we potentially split the AND to
+        // remove conditions on partition columns out of the AND. For this
+        // we set replaceSide accordingly so that when we reach a predecessor
+        // we can trim the appropriate side. If both sides of the AND have
+        // conditions on partition columns, we will remove the AND completely -
+        // in this case, we will not set replaceSide, but sawKey will be
+        // true so that as we go to higher predecessor ANDs we can trim later.
+        if(binOp instanceof AndExpression) {
+            if(lhsSawKey && rhsSawNonKeyCol){
+                replaceSide = Side.LEFT;
+            }else if(rhsSawKey && lhsSawNonKeyCol){
+                replaceSide = Side.RIGHT;
+            }
+        } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){
             LOG.warn("No partition filter push down: " +
-                "You have an partition column ("
-                + fieldName + ") inside a null check operator in the " +
-                "filter condition.");
+                "Use of partition column/condition with" +
+                " non partition column/condition in filter expression is not " +
+                "supported.");
             canPushDown = false;
+        }
+
+        sawKey = lhsSawKey || rhsSawKey;
+        sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
+    }
+
+    /**
+     * @return the condition on partition columns extracted from filter
+     */
+    public  Expression getPColCondition(){
+        if(!canPushDown || pColConditions.size() == 0)
+            return null;
+        Expression cond =  pColConditions.get(0);
+        for(int i=1; i<pColConditions.size(); i++){
+            //if there is more than one condition expression
+            // connect them using "AND"s
+            cond = new Expression.BinaryExpression(cond, pColConditions.get(i),
+                    OpType.OP_AND);
+        }
+        return cond;
+    }
+
+    /**
+     * @return the filterRemovable
+     */
+    public boolean isFilterRemovable() {
+        return canPushDown && filterRemovable;
+    }
+
+    //////// helper methods /////////////////////////
+    /**
+     * check for the presence of a certain operator type in the Successors
+     * @param opToStartFrom
+     * @param opsToCheckFor operators to be checked for at each level of
+     * Successors - the ordering in the list is the order in which the ops
+     * will be checked.
+     * @return true if opsToCheckFor are found
+     * @throws IOException
+     */
+    private boolean checkSuccessors(Operator opToStartFrom,
+            List<Class<?>> opsToCheckFor) throws FrontendException {
+        boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
+        if(!done && !opsToCheckFor.isEmpty()) {
+            // continue checking if there is more to check
+            while(!done) {
+                opToStartFrom = plan.getPredecessors(opToStartFrom).get(0);
+                done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
+            }
+        }
+        return opsToCheckFor.isEmpty();
+    }
+
+    private boolean checkSuccessorsHelper(Operator opToStartFrom,
+            List<Class<?>> opsToCheckFor) throws FrontendException {
+        List<Operator> successors = plan.getPredecessors(
+                opToStartFrom);
+        if(successors == null || successors.size() == 0) {
+            return true; // further checking cannot be done
+        }
+        if(successors.size() == 1) {
+            Operator suc  = successors.get(0);
+            if(suc.getClass().getCanonicalName().equals(
+                    opsToCheckFor.get(0).getCanonicalName())) {
+                // trim the list of operators to check
+                opsToCheckFor.remove(0);
+                if(opsToCheckFor.isEmpty()) {
+                    return true; //no further checks required
+                }
+            }
+        } else {
+            logInternalErrorAndSetFlag();
+        }
+        return false; // more checking can be done
+    }
+
+    private void replaceChild(LogicalExpression childExpr) throws FrontendException {
+
+        if(replaceSide == Side.NONE) {
+            // the child is trimmed when the appropriate
+            // flag is set to indicate that it needs to be trimmed.
             return;
-			}
-			opsToCheckFor.set(0, BinCondExpression.class);
-			if(checkSuccessors(project, opsToCheckFor)) {
-            LOG.warn("No partition filter push down: " +
-                "You have an partition column ("
-                + fieldName + ") inside a bincond operator in the " +
-                "filter condition.");
-            canPushDown = false;
+        }
+
+        // eg if replaceSide == Side.LEFT
+        //    binexpop
+        //   /   \ \
+        // child (this is the childExpr argument send in)
+        //  /  \
+        // Lt   Rt
+        //
+        // gets converted to
+        //  binexpop
+        //  /
+        // Rt
+
+        if( !( childExpr instanceof BinaryExpression ) ) {
+             logInternalErrorAndSetFlag();
+             return;
+        }
+        // child's lhs operand
+        LogicalExpression leftChild =
+            ((BinaryExpression)childExpr).getLhs();
+        // child's rhs operand
+        LogicalExpression rightChild =
+            ((BinaryExpression)childExpr).getRhs();
+
+        plan.disconnect( childExpr, leftChild );
+        plan.disconnect( childExpr, rightChild );
+
+        if(replaceSide == Side.LEFT) {
+            // remove left child and replace childExpr with its right child
+            remove( leftChild );
+            replace(childExpr, rightChild);
+        } else if(replaceSide == Side.RIGHT){
+            // remove right child and replace childExpr with its left child
+            remove(rightChild);
+            replace(childExpr, leftChild);
+        } else {
+            logInternalErrorAndSetFlag();
             return;
-			}
-		} else {
-			sawNonKeyCol = true;
-		}
-	}
-
-	private void visit(BinaryExpression binOp) throws FrontendException {
-		boolean lhsSawKey = false;
-		boolean rhsSawKey = false;
-		boolean lhsSawNonKeyCol = false;
-		boolean rhsSawNonKeyCol = false;
+        }
+        //reset
+        replaceSide = Side.NONE;
         sawKey = false;
-        sawNonKeyCol = false;
 
-        LogicalExpression binLHS = binOp.getLhs();
-        LogicalExpression binRHS = binOp.getRhs();
-        // Take care of nested OR as in
-        // ((cond1 and cond2) or (cond3 and cond4) or (cond5 and cond6)) or (cond7 and cond8)
-        if (binOp instanceof OrExpression &&
-                ((binLHS instanceof AndExpression && binRHS instanceof AndExpression) ||
-                  binLHS instanceof OrExpression || binRHS instanceof OrExpression)) {
-            visit(binLHS);
-            lhsSawNonKeyCol = sawNonKeyCol;
-            this.replaceSide = Side.NONE;
-            visit(binRHS);
-            rhsSawNonKeyCol = sawNonKeyCol;
-            this.replaceSide = Side.NONE;
-            if (lhsSawNonKeyCol || rhsSawNonKeyCol || !canPushDown) {
-                sawKey = false;
-                sawNonKeyCol = true;
-                // Don't set canPushDown to false. If there are other AND
-                // conditions on a partition column we want to push that down
-                LOG.warn("No partition filter push down: You have partition and non-partition "
-                        + "columns  in a construction like: "
-                        + "(pcond and non-pcond ..) or (pcond and non-pcond ...) "
-                        + "where pcond is a condition on a partition column and "
-                        + "non-pcond is a condition on a non-partition column.");
-                return;
-            }
+    }
+
+    private void replace(Operator oldOp, Operator newOp) throws FrontendException {
+        List<Operator> grandParents = plan.getPredecessors( oldOp );
+        if( grandParents == null || grandParents.size() == 0 ) {
+            plan.remove( oldOp );
+            return;
         }
-		visit( binOp.getLhs() );
-		replaceChild(binOp.getLhs());
-		lhsSawKey = sawKey;
-		lhsSawNonKeyCol = sawNonKeyCol;
-
-		sawKey = false;
-		sawNonKeyCol = false;
-		visit( binOp.getRhs() );
-		replaceChild(binOp.getRhs());
-		rhsSawKey = sawKey;
-		rhsSawNonKeyCol = sawNonKeyCol;
-
-		// only in the case of an AND, we potentially split the AND to
-		// remove conditions on partition columns out of the AND. For this
-		// we set replaceSide accordingly so that when we reach a predecessor
-		// we can trim the appropriate side. If both sides of the AND have
-		// conditions on partition columns, we will remove the AND completely -
-		// in this case, we will not set replaceSide, but sawKey will be
-		// true so that as we go to higher predecessor ANDs we can trim later.
-		if(binOp instanceof AndExpression) {
-			if(lhsSawKey && rhsSawNonKeyCol){
-				replaceSide = Side.LEFT;
-			}else if(rhsSawKey && lhsSawNonKeyCol){
-				replaceSide = Side.RIGHT;
-			}
-		} else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){
-        LOG.warn("No partition filter push down: " +
-            "Use of partition column/condition with" +
-            " non partition column/condition in filter expression is not " +
-            "supported.");
-        canPushDown = false;
-		}
+        Operator grandParent = plan.getPredecessors( oldOp ).get( 0 );
+        Pair<Integer, Integer> pair = plan.disconnect( grandParent, oldOp );
+        plan.add( newOp );
+        plan.connect( grandParent, pair.first, newOp, pair.second );
+        plan.remove( oldOp );
+    }
 
-		sawKey = lhsSawKey || rhsSawKey;
-		sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
-	}
-
-	/**
-	 * @return the condition on partition columns extracted from filter
-	 */
-	public  Expression getPColCondition(){
-    if(!canPushDown || pColConditions.size() == 0)
-        return null;
-		Expression cond =  pColConditions.get(0);
-		for(int i=1; i<pColConditions.size(); i++){
-			//if there is more than one condition expression
-			// connect them using "AND"s
-			cond = new Expression.BinaryExpression(cond, pColConditions.get(i),
-                    OpType.OP_AND);
-		}
-		return cond;
-	}
-
-	/**
-	 * @return the filterRemovable
-	 */
-	public boolean isFilterRemovable() {
-    return canPushDown && filterRemovable;
-	}
-
-	//////// helper methods /////////////////////////
-	/**
-	 * check for the presence of a certain operator type in the Successors
-	 * @param opToStartFrom
-	 * @param opsToCheckFor operators to be checked for at each level of
-	 * Successors - the ordering in the list is the order in which the ops
-	 * will be checked.
-	 * @return true if opsToCheckFor are found
-	 * @throws IOException
-	 */
-	private boolean checkSuccessors(Operator opToStartFrom,
-			List<Class<?>> opsToCheckFor) throws FrontendException {
-		boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
-		if(!done && !opsToCheckFor.isEmpty()) {
-			// continue checking if there is more to check
-			while(!done) {
-				opToStartFrom = plan.getPredecessors(opToStartFrom).get(0);
-				done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
-			}
-		}
-		return opsToCheckFor.isEmpty();
-	}
-
-	private boolean checkSuccessorsHelper(Operator opToStartFrom,
-			List<Class<?>> opsToCheckFor) throws FrontendException {
-		List<Operator> successors = plan.getPredecessors(
-				opToStartFrom);
-		if(successors == null || successors.size() == 0) {
-			return true; // further checking cannot be done
-		}
-		if(successors.size() == 1) {
-			Operator suc  = successors.get(0);
-			if(suc.getClass().getCanonicalName().equals(
-					opsToCheckFor.get(0).getCanonicalName())) {
-				// trim the list of operators to check
-				opsToCheckFor.remove(0);
-				if(opsToCheckFor.isEmpty()) {
-					return true; //no further checks required
-				}
-			}
-		} else {
-		    logInternalErrorAndSetFlag();
-		}
-		return false; // more checking can be done
-	}
-
-	private void replaceChild(LogicalExpression childExpr) throws FrontendException {
-
-		if(replaceSide == Side.NONE) {
-			// the child is trimmed when the appropriate
-			// flag is set to indicate that it needs to be trimmed.
-			return;
-		}
-
-		// eg if replaceSide == Side.LEFT
-		//    binexpop
-		//   /   \ \
-		// child (this is the childExpr argument send in)
-		//  /  \
-		// Lt   Rt
-		//
-		// gets converted to
-		//  binexpop
-		//  /
-		// Rt
-
-		if( !( childExpr instanceof BinaryExpression ) ) {
-		     logInternalErrorAndSetFlag();
-	         return;
-		}
-		// child's lhs operand
-		LogicalExpression leftChild =
-			((BinaryExpression)childExpr).getLhs();
-		// child's rhs operand
-		LogicalExpression rightChild =
-			((BinaryExpression)childExpr).getRhs();
-
-		plan.disconnect( childExpr, leftChild );
-		plan.disconnect( childExpr, rightChild );
-
-		if(replaceSide == Side.LEFT) {
-			// remove left child and replace childExpr with its right child
-			remove( leftChild );
-			replace(childExpr, rightChild);
-		} else if(replaceSide == Side.RIGHT){
-			// remove right child and replace childExpr with its left child
-			remove(rightChild);
-			replace(childExpr, leftChild);
-		}else {
-		     logInternalErrorAndSetFlag();
-	         return;
-		}
-		//reset
-		replaceSide = Side.NONE;
-		sawKey = false;
-
-	}
-
-	private void replace(Operator oldOp, Operator newOp) throws FrontendException {
-		List<Operator> grandParents = plan.getPredecessors( oldOp );
-		if( grandParents == null || grandParents.size() == 0 ) {
-			plan.remove( oldOp );
-			return;
-		}
-		Operator grandParent = plan.getPredecessors( oldOp ).get( 0 );
-		Pair<Integer, Integer> pair = plan.disconnect( grandParent, oldOp );
-		plan.add( newOp );
-		plan.connect( grandParent, pair.first, newOp, pair.second );
-		plan.remove( oldOp );
-	}
-
-	/**
-	 * @param op
-	 * @throws IOException
-	 * @throws IOException
-	 * @throws IOException
-	 */
-	private void remove(LogicalExpression op) throws FrontendException {
-		pColConditions.add( getExpression( op ) );
-		removeTree( op );
-	}
-
-	/**
-	 * Assume that the given operator is already disconnected from its predecessors.
-	 * @param op
-	 * @throws FrontendException
-	 */
-	private void removeTree(Operator op) throws FrontendException {
-		List<Operator> succs = plan.getSuccessors( op );
-		if( succs == null ) {
-			plan.remove( op );
-			return;
-		}
-
-		Operator[] children = new Operator[succs.size()];
-		for( int i = 0; i < succs.size(); i++ ) {
-			children[i] = succs.get(i);
-		}
-
-		for( Operator succ : children ) {
-			plan.disconnect( op, succ );
-			removeTree( succ );
-		}
-
-		plan.remove( op );
-	}
-
-	public Expression getExpression(LogicalExpression op) throws FrontendException
-	 {
-		if(op instanceof ConstantExpression) {
-			ConstantExpression constExpr =(ConstantExpression)op ;
-			return new Expression.Const( constExpr.getValue() );
-		} else if (op instanceof ProjectExpression) {
-			ProjectExpression projExpr = (ProjectExpression)op;
-			String fieldName = projExpr.getFieldSchema().alias;
+    /**
+     * @param op
+     * @throws IOException
+     * @throws IOException
+     * @throws IOException
+     */
+    private void remove(LogicalExpression op) throws FrontendException {
+        pColConditions.add( getExpression( op ) );
+        removeTree( op );
+    }
+
+    /**
+     * Assume that the given operator is already disconnected from its predecessors.
+     * @param op
+     * @throws FrontendException
+     */
+    private void removeTree(Operator op) throws FrontendException {
+        List<Operator> succs = plan.getSuccessors( op );
+        if( succs == null ) {
+            plan.remove( op );
+            return;
+        }
+
+        Operator[] children = new Operator[succs.size()];
+        for( int i = 0; i < succs.size(); i++ ) {
+            children[i] = succs.get(i);
+        }
+
+        for( Operator succ : children ) {
+            plan.disconnect( op, succ );
+            removeTree( succ );
+        }
+
+        plan.remove( op );
+    }
+
+    public Expression getExpression(LogicalExpression op) throws FrontendException
+     {
+        if(op instanceof ConstantExpression) {
+            ConstantExpression constExpr =(ConstantExpression)op ;
+            return new Expression.Const( constExpr.getValue() );
+        } else if (op instanceof ProjectExpression) {
+            ProjectExpression projExpr = (ProjectExpression)op;
+            String fieldName = projExpr.getFieldSchema().alias;
             return new Expression.Column(fieldName);
         } else {
-			if( !( op instanceof BinaryExpression ) ) {
-            logInternalErrorAndSetFlag();
-            return null;
-			}
-			BinaryExpression binOp = (BinaryExpression)op;
-			if(binOp instanceof AddExpression) {
-				return getExpression( binOp, OpType.OP_PLUS );
-			} else if(binOp instanceof SubtractExpression) {
-				return getExpression(binOp, OpType.OP_MINUS);
-			} else if(binOp instanceof MultiplyExpression) {
-				return getExpression(binOp, OpType.OP_TIMES);
-			} else if(binOp instanceof DivideExpression) {
-				return getExpression(binOp, OpType.OP_DIV);
-			} else if(binOp instanceof ModExpression) {
-				return getExpression(binOp, OpType.OP_MOD);
-			} else if(binOp instanceof AndExpression) {
-				return getExpression(binOp, OpType.OP_AND);
-			} else if(binOp instanceof OrExpression) {
-				return getExpression(binOp, OpType.OP_OR);
-			} else if(binOp instanceof EqualExpression) {
-				return getExpression(binOp, OpType.OP_EQ);
-			} else if(binOp instanceof NotEqualExpression) {
-				return getExpression(binOp, OpType.OP_NE);
-			} else if(binOp instanceof GreaterThanExpression) {
-				return getExpression(binOp, OpType.OP_GT);
-			} else if(binOp instanceof GreaterThanEqualExpression) {
-				return getExpression(binOp, OpType.OP_GE);
-			} else if(binOp instanceof LessThanExpression) {
-				return getExpression(binOp, OpType.OP_LT);
-			} else if(binOp instanceof LessThanEqualExpression) {
-				return getExpression(binOp, OpType.OP_LE);
-                        } else if(binOp instanceof RegexExpression) {
-                                return getExpression(binOp, OpType.OP_MATCH);
-			} else {
-            logInternalErrorAndSetFlag();
-			}
-		}
-		return null;
-	}
+            if( !( op instanceof BinaryExpression ) ) {
+                logInternalErrorAndSetFlag();
+                return null;
+            }
+            BinaryExpression binOp = (BinaryExpression)op;
+            if(binOp instanceof AddExpression) {
+                return getExpression( binOp, OpType.OP_PLUS );
+            } else if(binOp instanceof SubtractExpression) {
+                return getExpression(binOp, OpType.OP_MINUS);
+            } else if(binOp instanceof MultiplyExpression) {
+                return getExpression(binOp, OpType.OP_TIMES);
+            } else if(binOp instanceof DivideExpression) {
+                return getExpression(binOp, OpType.OP_DIV);
+            } else if(binOp instanceof ModExpression) {
+                return getExpression(binOp, OpType.OP_MOD);
+            } else if(binOp instanceof AndExpression) {
+                return getExpression(binOp, OpType.OP_AND);
+            } else if(binOp instanceof OrExpression) {
+                return getExpression(binOp, OpType.OP_OR);
+            } else if(binOp instanceof EqualExpression) {
+                return getExpression(binOp, OpType.OP_EQ);
+            } else if(binOp instanceof NotEqualExpression) {
+                return getExpression(binOp, OpType.OP_NE);
+            } else if(binOp instanceof GreaterThanExpression) {
+                return getExpression(binOp, OpType.OP_GT);
+            } else if(binOp instanceof GreaterThanEqualExpression) {
+                return getExpression(binOp, OpType.OP_GE);
+            } else if(binOp instanceof LessThanExpression) {
+                return getExpression(binOp, OpType.OP_LT);
+            } else if(binOp instanceof LessThanEqualExpression) {
+                return getExpression(binOp, OpType.OP_LE);
+            } else if(binOp instanceof RegexExpression) {
+                return getExpression(binOp, OpType.OP_MATCH);
+            } else {
+                logInternalErrorAndSetFlag();
+            }
+        }
+        return null;
+    }
 
     private Expression getExpression(BinaryExpression binOp, OpType
             opType) throws FrontendException {
         return new Expression.BinaryExpression(getExpression(binOp.getLhs())
-                ,getExpression(binOp.getRhs()), opType);
+                , getExpression(binOp.getRhs()), opType);
     }
 
     private void logInternalErrorAndSetFlag() throws FrontendException {
@@ -488,70 +528,70 @@ public class PColFilterExtractor extends
         canPushDown = false;
     }
 
-	// this might get called from some visit() - in that case, delegate to
-	// the other visit()s which we have defined here
-	private void visit(LogicalExpression op) throws FrontendException {
-		if(op instanceof ProjectExpression) {
-			visit((ProjectExpression)op);
-		} else if (op instanceof BinaryExpression) {
-			visit((BinaryExpression)op);
-		} else if (op instanceof CastExpression) {
-			visit((CastExpression)op);
-		} else if (op instanceof BinCondExpression) {
-			visit((BinCondExpression)op);
-		} else if (op instanceof UserFuncExpression) {
-			visit((UserFuncExpression)op);
-		} else if (op instanceof IsNullExpression) {
-			visit((IsNullExpression)op);
-		} else if( op instanceof NotExpression ) {
-			visit( (NotExpression)op );
-		} else if( op instanceof RegexExpression ) {
-			visit( (RegexExpression)op );
+    // this might get called from some visit() - in that case, delegate to
+    // the other visit()s which we have defined here
+    private void visit(LogicalExpression op) throws FrontendException {
+        if(op instanceof ProjectExpression) {
+            visit((ProjectExpression)op);
+        } else if (op instanceof BinaryExpression) {
+            visit((BinaryExpression)op);
+        } else if (op instanceof CastExpression) {
+            visit((CastExpression)op);
+        } else if (op instanceof BinCondExpression) {
+            visit((BinCondExpression)op);
+        } else if (op instanceof UserFuncExpression) {
+            visit((UserFuncExpression)op);
+        } else if (op instanceof IsNullExpression) {
+            visit((IsNullExpression)op);
+        } else if( op instanceof NotExpression ) {
+            visit( (NotExpression)op );
+        } else if( op instanceof RegexExpression ) {
+            visit( (RegexExpression)op );
         } else if (op instanceof MapLookupExpression) {
             visit((MapLookupExpression) op);
         } else if (op instanceof DereferenceExpression) {
             visit((DereferenceExpression) op);
         }
-	}
+    }
+
+    // some specific operators which are of interest to catch some
+    // unsupported scenarios
+    private void visit(CastExpression cast) throws FrontendException {
+        visit(cast.getExpression());
+    }
+
+    private void visit(NotExpression not) throws FrontendException {
+        visit(not.getExpression());
+    }
+
+    private void visit(RegexExpression regexp) throws FrontendException {
+        visit((BinaryExpression)regexp);
+    }
+
+    private void visit(BinCondExpression binCond) throws FrontendException {
+        visit(binCond.getCondition());
+        visit(binCond.getLhs());
+        visit(binCond.getRhs());
+    }
+
+    private void visit(UserFuncExpression udf) throws FrontendException {
+        for (LogicalExpression op : udf.getArguments()) {
+            visit(op);
+        }
+    }
 
-	// some specific operators which are of interest to catch some
-	// unsupported scenarios
-	private void visit(CastExpression cast) throws FrontendException {
-		visit(cast.getExpression());
-	}
-
-	private void visit(NotExpression not) throws FrontendException {
-		visit(not.getExpression());
-	}
-
-	private void visit(RegexExpression regexp) throws FrontendException {
-		visit((BinaryExpression)regexp);
-	}
-
-	private void visit(BinCondExpression binCond) throws FrontendException {
-		visit(binCond.getCondition());
-		visit(binCond.getLhs());
-		visit(binCond.getRhs());
-	}
-
-	private void visit(UserFuncExpression udf) throws FrontendException {
-		for (LogicalExpression op : udf.getArguments()) {
-			visit(op);
-		}
-	}
-
-	private void visit(IsNullExpression isNull) throws FrontendException {
-		visit(isNull.getExpression());
-	}
-	
-	private void visit(MapLookupExpression mapLookup) throws FrontendException {
+    private void visit(IsNullExpression isNull) throws FrontendException {
+        visit(isNull.getExpression());
+    }
+
+    private void visit(MapLookupExpression mapLookup) throws FrontendException {
         visit(mapLookup.getMap());
     }
-	
-	private void visit(DereferenceExpression deref) throws FrontendException {
+
+    private void visit(DereferenceExpression deref) throws FrontendException {
         visit(deref.getReferredExpression());
     }
-	
+
     public boolean canPushDown() {
         return canPushDown;
     }

Modified: pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=1509455&r1=1509454&r2=1509455&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Thu Aug  1 22:17:32 2013
@@ -720,6 +720,99 @@ public class TestPartitionFilterPushDown
         test(q, Arrays.asList("srcid"), null, "(name matches 'foo*')");
     }
 
+    /**
+     * Test PIG-3395 Large filter expression makes Pig hang
+     * @throws Exception
+     */
+    @Test
+    public void testLargeAndOrCondition() throws Exception {
+        String q = query + "b = filter a by " +
+                "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+                "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+                "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
+                "or (srcid == 10 and mrkt == '11' and dstid == 12) " +
+                "or (srcid == 13 and mrkt == '14' and dstid == 15) " +
+                "or (srcid == 16 and mrkt == '17' and dstid == 18) " +
+                "or (srcid == 19 and mrkt == '20' and dstid == 21) " +
+                "or (srcid == 22 and mrkt == '23' and dstid == 24) " +
+                "or (srcid == 25 and mrkt == '26' and dstid == 27) " +
+                "or (srcid == 28 and mrkt == '29' and dstid == 30) " +
+                "or (srcid == 31 and mrkt == '32' and dstid == 33) " +
+                "or (srcid == 34 and mrkt == '35' and dstid == 36) " +
+                "or (srcid == 37 and mrkt == '38' and dstid == 39) " +
+                "or (srcid == 40 and mrkt == '41' and dstid == 42) " +
+                "or (srcid == 43 and mrkt == '44' and dstid == 45) " +
+                "or (srcid == 46 and mrkt == '47' and dstid == 48) " +
+                "or (srcid == 49 and mrkt == '50' and dstid == 51) " +
+                "or (srcid == 52 and mrkt == '53' and dstid == 54) " +
+                "or (srcid == 55 and mrkt == '56' and dstid == 57) " +
+                "or (srcid == 58 and mrkt == '59' and dstid == 60) " +
+                "or (srcid == 61 and mrkt == '62' and dstid == 63) " +
+                "or (srcid == 64 and mrkt == '65' and dstid == 66) " +
+                "or (srcid == 67 and mrkt == '68' and dstid == 69);" +
+                "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt", "dstid"),
+                "(((((((((((((((((((((((((srcid == 1) and (mrkt == '2')) and (dstid == 3)) " +
+                "or (((srcid == 4) and (mrkt == '5')) and (dstid == 6))) " +
+                "or (((srcid == 7) and (mrkt == '8')) and (dstid == 9))) " +
+                "or (((srcid == 10) and (mrkt == '11')) and (dstid == 12))) " +
+                "or (((srcid == 13) and (mrkt == '14')) and (dstid == 15))) " +
+                "or (((srcid == 16) and (mrkt == '17')) and (dstid == 18))) " +
+                "or (((srcid == 19) and (mrkt == '20')) and (dstid == 21))) " +
+                "or (((srcid == 22) and (mrkt == '23')) and (dstid == 24))) " +
+                "or (((srcid == 25) and (mrkt == '26')) and (dstid == 27))) " +
+                "or (((srcid == 28) and (mrkt == '29')) and (dstid == 30))) " +
+                "or (((srcid == 31) and (mrkt == '32')) and (dstid == 33))) " +
+                "or (((srcid == 34) and (mrkt == '35')) and (dstid == 36))) " +
+                "or (((srcid == 37) and (mrkt == '38')) and (dstid == 39))) " +
+                "or (((srcid == 40) and (mrkt == '41')) and (dstid == 42))) " +
+                "or (((srcid == 43) and (mrkt == '44')) and (dstid == 45))) " +
+                "or (((srcid == 46) and (mrkt == '47')) and (dstid == 48))) " +
+                "or (((srcid == 49) and (mrkt == '50')) and (dstid == 51))) " +
+                "or (((srcid == 52) and (mrkt == '53')) and (dstid == 54))) " +
+                "or (((srcid == 55) and (mrkt == '56')) and (dstid == 57))) " +
+                "or (((srcid == 58) and (mrkt == '59')) and (dstid == 60))) " +
+                "or (((srcid == 61) and (mrkt == '62')) and (dstid == 63))) " +
+                "or (((srcid == 64) and (mrkt == '65')) and (dstid == 66))) " +
+                "or (((srcid == 67) and (mrkt == '68')) and (dstid == 69)))",
+                null);
+    }
+
+    // UDF expression should make the entire filter be rejected
+    @Test
+    public void testAndOrConditionMixedWithUdfExpr() throws Exception {
+        String q = query + "b = filter a by " +
+                "(UPPER(name) == 'FOO')" +
+                "or (srcid == 1 and mrkt == '2' and dstid == 3) " +
+                "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+                "or (srcid == 7 and mrkt == '8' and dstid == 9);" +
+                "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
+    }
+
+    // Cast expression should make the entire filter be rejected
+    @Test
+    public void testAndOrConditionMixedWithCastExpr() throws Exception {
+        String q = query + "b = filter a by " +
+                "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+                "or (srcid == 4 and (int)mrkt == 5 and dstid == 6) " +
+                "or (srcid == 7 and mrkt == '8' and dstid == 9);" +
+                "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
+    }
+
+    // Null expression should make the entire filter be rejected
+    @Test
+    public void testAndOrConditionMixedWithNullExpr() throws Exception {
+        String q = query + "b = filter a by " +
+                "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+                "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+                "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
+                "or (name is null);" +
+                "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
+    }
+
     //// helper methods ///////
 
     private PColFilterExtractor test(String query, List<String> partitionCols,
@@ -773,6 +866,20 @@ public class TestPartitionFilterPushDown
         return pColExtractor;
     }
 
+    // The filter cannot be pushed down unless it meets certain conditions. In
+    // that case, PColExtractor.getPColCondition() should return null.
+    private void negativeTest(String query, List<String> partitionCols) throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+        PColFilterExtractor pColExtractor = new PColFilterExtractor(
+                filter.getFilterPlan(), partitionCols);
+        pColExtractor.visit();
+        Assert.assertFalse(pColExtractor.canPushDown());
+        Assert.assertNull(pColExtractor.getPColCondition());
+    }
+
     private void negativeTest(String query, List<String> partitionCols,
             int expectedErrorCode) throws Exception {
         PigServer pigServer = new PigServer( pc );