You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/08/02 00:17:32 UTC
svn commit: r1509455 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/newplan/PColFilterExtractor.java
test/org/apache/pig/test/TestPartitionFilterPushDown.java
Author: cheolsoo
Date: Thu Aug 1 22:17:32 2013
New Revision: 1509455
URL: http://svn.apache.org/r1509455
Log:
PIG-3395: Large filter expression makes Pig hang (cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1509455&r1=1509454&r2=1509455&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 1 22:17:32 2013
@@ -200,6 +200,8 @@ PIG-2910: Add function to read schema fr
OPTIMIZATIONS
+PIG-3395: Large filter expression makes Pig hang (cheolsoo)
+
PIG-3123: Simplify Logical Plans By Removing Unneccessary Identity Projections (njw45 via cheolsoo)
PIG-3013: BinInterSedes improve chararray sort performance (rohini)
Modified: pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java?rev=1509455&r1=1509454&r2=1509455&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PColFilterExtractor.java Thu Aug 1 22:17:32 2013
@@ -68,417 +68,457 @@ public class PColFilterExtractor extends
private static final Log LOG = LogFactory.getLog(PColFilterExtractor.class);
- /**
- * partition columns associated with the table
- * present in the load on which the filter whose
- * inner plan is being visited is applied
- */
- private List<String> partitionCols;
-
- /**
- * will contain the partition column filter conditions
- * accumulated during the visit - the final condition will an expression
- * built from these sub expressions connected with AND
- */
- private ArrayList<Expression> pColConditions = new ArrayList<Expression>();
-
- /**
- * flag used during visit to indicate if a partition key
- * was seen
- */
- private boolean sawKey;
+ /**
+ * partition columns associated with the table
+ * present in the load on which the filter whose
+ * inner plan is being visited is applied
+ */
+ private List<String> partitionCols;
+
+ /**
+ * will contain the partition column filter conditions
+ * accumulated during the visit - the final condition will an expression
+ * built from these sub expressions connected with AND
+ */
+ private ArrayList<Expression> pColConditions = new ArrayList<Expression>();
+
+ /**
+ * flag used during visit to indicate if a partition key
+ * was seen
+ */
+ private boolean sawKey;
- private boolean sawNonKeyCol;
+ private boolean sawNonKeyCol;
- private enum Side { LEFT, RIGHT, NONE };
- private Side replaceSide = Side.NONE;
+ private enum Side { LEFT, RIGHT, NONE };
+ private Side replaceSide = Side.NONE;
- private boolean filterRemovable = false;
+ private boolean filterRemovable = false;
private boolean canPushDown = true;
- @Override
- public void visit() throws FrontendException {
- // we will visit the leaf and it will recursively walk the plan
- LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 );
- // if the leaf is a unary operator it should be a FilterFunc in
- // which case we don't try to extract partition filter conditions
- if(leaf instanceof BinaryExpression) {
- BinaryExpression binExpr = (BinaryExpression)leaf;
- visit( binExpr );
- replaceChild( binExpr );
- // if the entire expression is to be removed, then the above
- // replaceChild will not set sawKey to false (sawKey is set to
- // false only in replaceChild()
- if(sawKey == true) {
- //there are only conditions on partition columns in the filter
- //extract it
- pColConditions.add( getExpression( leaf ) );
- filterRemovable = true;
- }
- }
- }
-
- /**
- *
- * @param plan logical plan corresponding the filter's comparison condition
- * @param partitionCols list of partition columns of the table which is
- * being loaded in the LOAD statement which is input to the filter
- */
- public PColFilterExtractor(OperatorPlan plan,
- List<String> partitionCols) {
- // though we configure a DepthFirstWalker to be the walker, we will not
- // use it - we will visit the leaf and it will recursively walk the
- // plan
- super( plan, new DepthFirstWalker( plan ) );
- this.partitionCols = new ArrayList<String>(partitionCols);
- }
-
- protected void visit(ProjectExpression project) throws FrontendException {
- String fieldName = project.getFieldSchema().alias;
- if(partitionCols.contains(fieldName)) {
- sawKey = true;
- // The condition on partition column will be used to prune the
- // scan and removed from the filter condition. Hence the condition
- // on the partition column will not be re applied when data is read,
- // so the following cases should throw error until that changes.
- List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>();
+ @Override
+ public void visit() throws FrontendException {
+ // we will visit the leaf and it will recursively walk the plan
+ LogicalExpression leaf = (LogicalExpression)plan.getSources().get( 0 );
+ // if the leaf is a unary operator it should be a FilterFunc in
+ // which case we don't try to extract partition filter conditions
+ if(leaf instanceof BinaryExpression) {
+ BinaryExpression binExpr = (BinaryExpression)leaf;
+ visit( binExpr );
+ replaceChild( binExpr );
+ // if the entire expression is to be removed, then the above
+ // replaceChild will not set sawKey to false (sawKey is set to
+ // false only in replaceChild()
+ if(sawKey == true) {
+ //there are only conditions on partition columns in the filter
+ //extract it
+ pColConditions.add( getExpression( leaf ) );
+ filterRemovable = true;
+ }
+ }
+ }
+
+ /**
+ *
+ * @param plan logical plan corresponding the filter's comparison condition
+ * @param partitionCols list of partition columns of the table which is
+ * being loaded in the LOAD statement which is input to the filter
+ */
+ public PColFilterExtractor(OperatorPlan plan,
+ List<String> partitionCols) {
+ // though we configure a DepthFirstWalker to be the walker, we will not
+ // use it - we will visit the leaf and it will recursively walk the
+ // plan
+ super( plan, new DepthFirstWalker( plan ) );
+ this.partitionCols = new ArrayList<String>(partitionCols);
+ }
+
+ protected void visit(ProjectExpression project) throws FrontendException {
+ String fieldName = project.getFieldSchema().alias;
+ if(partitionCols.contains(fieldName)) {
+ sawKey = true;
+ // The condition on partition column will be used to prune the
+ // scan and removed from the filter condition. Hence the condition
+ // on the partition column will not be re applied when data is read,
+ // so the following cases should throw error until that changes.
+ List<Class<?>> opsToCheckFor = new ArrayList<Class<?>>();
opsToCheckFor.add(UserFuncExpression.class);
- if(checkSuccessors(project, opsToCheckFor)) {
- LOG.warn("No partition filter push down: " +
- "You have an partition column ("
- + fieldName + ") inside a function in the " +
- "filter condition.");
- canPushDown = false;
- return;
- }
- opsToCheckFor.set(0, CastExpression.class);
- if(checkSuccessors(project, opsToCheckFor)) {
- LOG.warn("No partition filter push down: " +
- "You have an partition column ("
- + fieldName + ") inside a cast in the " +
- "filter condition.");
- canPushDown = false;
+ if(checkSuccessors(project, opsToCheckFor)) {
+ LOG.warn("No partition filter push down: " +
+ "You have an partition column ("
+ + fieldName + ") inside a function in the " +
+ "filter condition.");
+ canPushDown = false;
+ return;
+ }
+ opsToCheckFor.set(0, CastExpression.class);
+ if(checkSuccessors(project, opsToCheckFor)) {
+ LOG.warn("No partition filter push down: " +
+ "You have an partition column ("
+ + fieldName + ") inside a cast in the " +
+ "filter condition.");
+ canPushDown = false;
+ return;
+ }
+ opsToCheckFor.set(0, IsNullExpression.class);
+ if(checkSuccessors(project, opsToCheckFor)) {
+ LOG.warn("No partition filter push down: " +
+ "You have an partition column ("
+ + fieldName + ") inside a null check operator in the " +
+ "filter condition.");
+ canPushDown = false;
+ return;
+ }
+ opsToCheckFor.set(0, BinCondExpression.class);
+ if(checkSuccessors(project, opsToCheckFor)) {
+ LOG.warn("No partition filter push down: " +
+ "You have an partition column ("
+ + fieldName + ") inside a bincond operator in the " +
+ "filter condition.");
+ canPushDown = false;
+ return;
+ }
+ } else {
+ sawNonKeyCol = true;
+ }
+ }
+
+ /**
+ * Detect whether a non-partition column is present in the expression.
+ * @param binOp
+ * @return true or false
+ * @throws FrontendException
+ */
+ private boolean detectNonPartitionColumn(BinaryExpression binOp) throws FrontendException {
+ LogicalExpression lhs = binOp.getLhs();
+ LogicalExpression rhs = binOp.getRhs();
+ if (lhs instanceof ProjectExpression) {
+ String fieldName = ((ProjectExpression)lhs).getFieldSchema().alias;
+ if(!partitionCols.contains(fieldName)) {
+ return true;
+ }
+ }
+ if (rhs instanceof ProjectExpression) {
+ String fieldName = ((ProjectExpression)rhs).getFieldSchema().alias;
+ if(!partitionCols.contains(fieldName)) {
+ return true;
+ }
+ }
+
+ boolean lhsSawNonKeyCol = false;
+ boolean rhsSawNonKeyCol = false;
+ if (lhs instanceof BinaryExpression) {
+ lhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)lhs);
+ }
+ if (rhs instanceof BinaryExpression) {
+ rhsSawNonKeyCol = detectNonPartitionColumn((BinaryExpression)rhs);
+ }
+
+ return lhsSawNonKeyCol || rhsSawNonKeyCol;
+ }
+
+ /**
+ * Detect and/or expressions that contain both partition and non-partition
+ * conditions such as '(pcond and non-pcond) or (pcond and non-pcond)'.
+ * @param binOp
+ * @return true or false
+ * @throws FrontendException
+ */
+ private boolean detectAndOrConditionWithMixedColumns(BinaryExpression binOp) throws FrontendException {
+ LogicalExpression lhs = binOp.getLhs();
+ LogicalExpression rhs = binOp.getRhs();
+
+ if ( (binOp instanceof OrExpression) &&
+ ( (lhs instanceof AndExpression && rhs instanceof AndExpression) ||
+ (lhs instanceof OrExpression || rhs instanceof OrExpression) ) ) {
+ return detectNonPartitionColumn(binOp);
+ }
+
+ return false;
+ }
+
+ private void visit(BinaryExpression binOp) throws FrontendException {
+ boolean lhsSawKey = false;
+ boolean rhsSawKey = false;
+ boolean lhsSawNonKeyCol = false;
+ boolean rhsSawNonKeyCol = false;
+ sawKey = false;
+ sawNonKeyCol = false;
+
+ if (detectAndOrConditionWithMixedColumns(binOp)) {
+ sawNonKeyCol = true;
+ // Don't set canPushDown to false. If there are other AND
+ // conditions on a partition column we want to push that down
+ LOG.warn("No partition filter push down: You have partition and non-partition "
+ + "columns in a construction like: "
+ + "(pcond and non-pcond ..) or (pcond and non-pcond ...) "
+ + "where pcond is a condition on a partition column and "
+ + "non-pcond is a condition on a non-partition column.");
return;
- }
- opsToCheckFor.set(0, IsNullExpression.class);
- if(checkSuccessors(project, opsToCheckFor)) {
+ }
+
+ visit( binOp.getLhs() );
+ replaceChild(binOp.getLhs());
+ lhsSawKey = sawKey;
+ lhsSawNonKeyCol = sawNonKeyCol;
+
+ sawKey = false;
+ sawNonKeyCol = false;
+ visit( binOp.getRhs() );
+ replaceChild(binOp.getRhs());
+ rhsSawKey = sawKey;
+ rhsSawNonKeyCol = sawNonKeyCol;
+
+ // only in the case of an AND, we potentially split the AND to
+ // remove conditions on partition columns out of the AND. For this
+ // we set replaceSide accordingly so that when we reach a predecessor
+ // we can trim the appropriate side. If both sides of the AND have
+ // conditions on partition columns, we will remove the AND completely -
+ // in this case, we will not set replaceSide, but sawKey will be
+ // true so that as we go to higher predecessor ANDs we can trim later.
+ if(binOp instanceof AndExpression) {
+ if(lhsSawKey && rhsSawNonKeyCol){
+ replaceSide = Side.LEFT;
+ }else if(rhsSawKey && lhsSawNonKeyCol){
+ replaceSide = Side.RIGHT;
+ }
+ } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){
LOG.warn("No partition filter push down: " +
- "You have an partition column ("
- + fieldName + ") inside a null check operator in the " +
- "filter condition.");
+ "Use of partition column/condition with" +
+ " non partition column/condition in filter expression is not " +
+ "supported.");
canPushDown = false;
+ }
+
+ sawKey = lhsSawKey || rhsSawKey;
+ sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
+ }
+
+ /**
+ * @return the condition on partition columns extracted from filter
+ */
+ public Expression getPColCondition(){
+ if(!canPushDown || pColConditions.size() == 0)
+ return null;
+ Expression cond = pColConditions.get(0);
+ for(int i=1; i<pColConditions.size(); i++){
+ //if there is more than one condition expression
+ // connect them using "AND"s
+ cond = new Expression.BinaryExpression(cond, pColConditions.get(i),
+ OpType.OP_AND);
+ }
+ return cond;
+ }
+
+ /**
+ * @return the filterRemovable
+ */
+ public boolean isFilterRemovable() {
+ return canPushDown && filterRemovable;
+ }
+
+ //////// helper methods /////////////////////////
+ /**
+ * check for the presence of a certain operator type in the Successors
+ * @param opToStartFrom
+ * @param opsToCheckFor operators to be checked for at each level of
+ * Successors - the ordering in the list is the order in which the ops
+ * will be checked.
+ * @return true if opsToCheckFor are found
+ * @throws IOException
+ */
+ private boolean checkSuccessors(Operator opToStartFrom,
+ List<Class<?>> opsToCheckFor) throws FrontendException {
+ boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
+ if(!done && !opsToCheckFor.isEmpty()) {
+ // continue checking if there is more to check
+ while(!done) {
+ opToStartFrom = plan.getPredecessors(opToStartFrom).get(0);
+ done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
+ }
+ }
+ return opsToCheckFor.isEmpty();
+ }
+
+ private boolean checkSuccessorsHelper(Operator opToStartFrom,
+ List<Class<?>> opsToCheckFor) throws FrontendException {
+ List<Operator> successors = plan.getPredecessors(
+ opToStartFrom);
+ if(successors == null || successors.size() == 0) {
+ return true; // further checking cannot be done
+ }
+ if(successors.size() == 1) {
+ Operator suc = successors.get(0);
+ if(suc.getClass().getCanonicalName().equals(
+ opsToCheckFor.get(0).getCanonicalName())) {
+ // trim the list of operators to check
+ opsToCheckFor.remove(0);
+ if(opsToCheckFor.isEmpty()) {
+ return true; //no further checks required
+ }
+ }
+ } else {
+ logInternalErrorAndSetFlag();
+ }
+ return false; // more checking can be done
+ }
+
+ private void replaceChild(LogicalExpression childExpr) throws FrontendException {
+
+ if(replaceSide == Side.NONE) {
+ // the child is trimmed when the appropriate
+ // flag is set to indicate that it needs to be trimmed.
return;
- }
- opsToCheckFor.set(0, BinCondExpression.class);
- if(checkSuccessors(project, opsToCheckFor)) {
- LOG.warn("No partition filter push down: " +
- "You have an partition column ("
- + fieldName + ") inside a bincond operator in the " +
- "filter condition.");
- canPushDown = false;
+ }
+
+ // eg if replaceSide == Side.LEFT
+ // binexpop
+ // / \ \
+ // child (this is the childExpr argument send in)
+ // / \
+ // Lt Rt
+ //
+ // gets converted to
+ // binexpop
+ // /
+ // Rt
+
+ if( !( childExpr instanceof BinaryExpression ) ) {
+ logInternalErrorAndSetFlag();
+ return;
+ }
+ // child's lhs operand
+ LogicalExpression leftChild =
+ ((BinaryExpression)childExpr).getLhs();
+ // child's rhs operand
+ LogicalExpression rightChild =
+ ((BinaryExpression)childExpr).getRhs();
+
+ plan.disconnect( childExpr, leftChild );
+ plan.disconnect( childExpr, rightChild );
+
+ if(replaceSide == Side.LEFT) {
+ // remove left child and replace childExpr with its right child
+ remove( leftChild );
+ replace(childExpr, rightChild);
+ } else if(replaceSide == Side.RIGHT){
+ // remove right child and replace childExpr with its left child
+ remove(rightChild);
+ replace(childExpr, leftChild);
+ } else {
+ logInternalErrorAndSetFlag();
return;
- }
- } else {
- sawNonKeyCol = true;
- }
- }
-
- private void visit(BinaryExpression binOp) throws FrontendException {
- boolean lhsSawKey = false;
- boolean rhsSawKey = false;
- boolean lhsSawNonKeyCol = false;
- boolean rhsSawNonKeyCol = false;
+ }
+ //reset
+ replaceSide = Side.NONE;
sawKey = false;
- sawNonKeyCol = false;
- LogicalExpression binLHS = binOp.getLhs();
- LogicalExpression binRHS = binOp.getRhs();
- // Take care of nested OR as in
- // ((cond1 and cond2) or (cond3 and cond4) or (cond5 and cond6)) or (cond7 and cond8)
- if (binOp instanceof OrExpression &&
- ((binLHS instanceof AndExpression && binRHS instanceof AndExpression) ||
- binLHS instanceof OrExpression || binRHS instanceof OrExpression)) {
- visit(binLHS);
- lhsSawNonKeyCol = sawNonKeyCol;
- this.replaceSide = Side.NONE;
- visit(binRHS);
- rhsSawNonKeyCol = sawNonKeyCol;
- this.replaceSide = Side.NONE;
- if (lhsSawNonKeyCol || rhsSawNonKeyCol || !canPushDown) {
- sawKey = false;
- sawNonKeyCol = true;
- // Don't set canPushDown to false. If there are other AND
- // conditions on a partition column we want to push that down
- LOG.warn("No partition filter push down: You have partition and non-partition "
- + "columns in a construction like: "
- + "(pcond and non-pcond ..) or (pcond and non-pcond ...) "
- + "where pcond is a condition on a partition column and "
- + "non-pcond is a condition on a non-partition column.");
- return;
- }
+ }
+
+ private void replace(Operator oldOp, Operator newOp) throws FrontendException {
+ List<Operator> grandParents = plan.getPredecessors( oldOp );
+ if( grandParents == null || grandParents.size() == 0 ) {
+ plan.remove( oldOp );
+ return;
}
- visit( binOp.getLhs() );
- replaceChild(binOp.getLhs());
- lhsSawKey = sawKey;
- lhsSawNonKeyCol = sawNonKeyCol;
-
- sawKey = false;
- sawNonKeyCol = false;
- visit( binOp.getRhs() );
- replaceChild(binOp.getRhs());
- rhsSawKey = sawKey;
- rhsSawNonKeyCol = sawNonKeyCol;
-
- // only in the case of an AND, we potentially split the AND to
- // remove conditions on partition columns out of the AND. For this
- // we set replaceSide accordingly so that when we reach a predecessor
- // we can trim the appropriate side. If both sides of the AND have
- // conditions on partition columns, we will remove the AND completely -
- // in this case, we will not set replaceSide, but sawKey will be
- // true so that as we go to higher predecessor ANDs we can trim later.
- if(binOp instanceof AndExpression) {
- if(lhsSawKey && rhsSawNonKeyCol){
- replaceSide = Side.LEFT;
- }else if(rhsSawKey && lhsSawNonKeyCol){
- replaceSide = Side.RIGHT;
- }
- } else if(lhsSawKey && rhsSawNonKeyCol || rhsSawKey && lhsSawNonKeyCol){
- LOG.warn("No partition filter push down: " +
- "Use of partition column/condition with" +
- " non partition column/condition in filter expression is not " +
- "supported.");
- canPushDown = false;
- }
+ Operator grandParent = plan.getPredecessors( oldOp ).get( 0 );
+ Pair<Integer, Integer> pair = plan.disconnect( grandParent, oldOp );
+ plan.add( newOp );
+ plan.connect( grandParent, pair.first, newOp, pair.second );
+ plan.remove( oldOp );
+ }
- sawKey = lhsSawKey || rhsSawKey;
- sawNonKeyCol = lhsSawNonKeyCol || rhsSawNonKeyCol;
- }
-
- /**
- * @return the condition on partition columns extracted from filter
- */
- public Expression getPColCondition(){
- if(!canPushDown || pColConditions.size() == 0)
- return null;
- Expression cond = pColConditions.get(0);
- for(int i=1; i<pColConditions.size(); i++){
- //if there is more than one condition expression
- // connect them using "AND"s
- cond = new Expression.BinaryExpression(cond, pColConditions.get(i),
- OpType.OP_AND);
- }
- return cond;
- }
-
- /**
- * @return the filterRemovable
- */
- public boolean isFilterRemovable() {
- return canPushDown && filterRemovable;
- }
-
- //////// helper methods /////////////////////////
- /**
- * check for the presence of a certain operator type in the Successors
- * @param opToStartFrom
- * @param opsToCheckFor operators to be checked for at each level of
- * Successors - the ordering in the list is the order in which the ops
- * will be checked.
- * @return true if opsToCheckFor are found
- * @throws IOException
- */
- private boolean checkSuccessors(Operator opToStartFrom,
- List<Class<?>> opsToCheckFor) throws FrontendException {
- boolean done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
- if(!done && !opsToCheckFor.isEmpty()) {
- // continue checking if there is more to check
- while(!done) {
- opToStartFrom = plan.getPredecessors(opToStartFrom).get(0);
- done = checkSuccessorsHelper(opToStartFrom, opsToCheckFor);
- }
- }
- return opsToCheckFor.isEmpty();
- }
-
- private boolean checkSuccessorsHelper(Operator opToStartFrom,
- List<Class<?>> opsToCheckFor) throws FrontendException {
- List<Operator> successors = plan.getPredecessors(
- opToStartFrom);
- if(successors == null || successors.size() == 0) {
- return true; // further checking cannot be done
- }
- if(successors.size() == 1) {
- Operator suc = successors.get(0);
- if(suc.getClass().getCanonicalName().equals(
- opsToCheckFor.get(0).getCanonicalName())) {
- // trim the list of operators to check
- opsToCheckFor.remove(0);
- if(opsToCheckFor.isEmpty()) {
- return true; //no further checks required
- }
- }
- } else {
- logInternalErrorAndSetFlag();
- }
- return false; // more checking can be done
- }
-
- private void replaceChild(LogicalExpression childExpr) throws FrontendException {
-
- if(replaceSide == Side.NONE) {
- // the child is trimmed when the appropriate
- // flag is set to indicate that it needs to be trimmed.
- return;
- }
-
- // eg if replaceSide == Side.LEFT
- // binexpop
- // / \ \
- // child (this is the childExpr argument send in)
- // / \
- // Lt Rt
- //
- // gets converted to
- // binexpop
- // /
- // Rt
-
- if( !( childExpr instanceof BinaryExpression ) ) {
- logInternalErrorAndSetFlag();
- return;
- }
- // child's lhs operand
- LogicalExpression leftChild =
- ((BinaryExpression)childExpr).getLhs();
- // child's rhs operand
- LogicalExpression rightChild =
- ((BinaryExpression)childExpr).getRhs();
-
- plan.disconnect( childExpr, leftChild );
- plan.disconnect( childExpr, rightChild );
-
- if(replaceSide == Side.LEFT) {
- // remove left child and replace childExpr with its right child
- remove( leftChild );
- replace(childExpr, rightChild);
- } else if(replaceSide == Side.RIGHT){
- // remove right child and replace childExpr with its left child
- remove(rightChild);
- replace(childExpr, leftChild);
- }else {
- logInternalErrorAndSetFlag();
- return;
- }
- //reset
- replaceSide = Side.NONE;
- sawKey = false;
-
- }
-
- private void replace(Operator oldOp, Operator newOp) throws FrontendException {
- List<Operator> grandParents = plan.getPredecessors( oldOp );
- if( grandParents == null || grandParents.size() == 0 ) {
- plan.remove( oldOp );
- return;
- }
- Operator grandParent = plan.getPredecessors( oldOp ).get( 0 );
- Pair<Integer, Integer> pair = plan.disconnect( grandParent, oldOp );
- plan.add( newOp );
- plan.connect( grandParent, pair.first, newOp, pair.second );
- plan.remove( oldOp );
- }
-
- /**
- * @param op
- * @throws IOException
- * @throws IOException
- * @throws IOException
- */
- private void remove(LogicalExpression op) throws FrontendException {
- pColConditions.add( getExpression( op ) );
- removeTree( op );
- }
-
- /**
- * Assume that the given operator is already disconnected from its predecessors.
- * @param op
- * @throws FrontendException
- */
- private void removeTree(Operator op) throws FrontendException {
- List<Operator> succs = plan.getSuccessors( op );
- if( succs == null ) {
- plan.remove( op );
- return;
- }
-
- Operator[] children = new Operator[succs.size()];
- for( int i = 0; i < succs.size(); i++ ) {
- children[i] = succs.get(i);
- }
-
- for( Operator succ : children ) {
- plan.disconnect( op, succ );
- removeTree( succ );
- }
-
- plan.remove( op );
- }
-
- public Expression getExpression(LogicalExpression op) throws FrontendException
- {
- if(op instanceof ConstantExpression) {
- ConstantExpression constExpr =(ConstantExpression)op ;
- return new Expression.Const( constExpr.getValue() );
- } else if (op instanceof ProjectExpression) {
- ProjectExpression projExpr = (ProjectExpression)op;
- String fieldName = projExpr.getFieldSchema().alias;
+ /**
+ * @param op
+ * @throws IOException
+ * @throws IOException
+ * @throws IOException
+ */
+ private void remove(LogicalExpression op) throws FrontendException {
+ pColConditions.add( getExpression( op ) );
+ removeTree( op );
+ }
+
+ /**
+ * Assume that the given operator is already disconnected from its predecessors.
+ * @param op
+ * @throws FrontendException
+ */
+ private void removeTree(Operator op) throws FrontendException {
+ List<Operator> succs = plan.getSuccessors( op );
+ if( succs == null ) {
+ plan.remove( op );
+ return;
+ }
+
+ Operator[] children = new Operator[succs.size()];
+ for( int i = 0; i < succs.size(); i++ ) {
+ children[i] = succs.get(i);
+ }
+
+ for( Operator succ : children ) {
+ plan.disconnect( op, succ );
+ removeTree( succ );
+ }
+
+ plan.remove( op );
+ }
+
+ public Expression getExpression(LogicalExpression op) throws FrontendException
+ {
+ if(op instanceof ConstantExpression) {
+ ConstantExpression constExpr =(ConstantExpression)op ;
+ return new Expression.Const( constExpr.getValue() );
+ } else if (op instanceof ProjectExpression) {
+ ProjectExpression projExpr = (ProjectExpression)op;
+ String fieldName = projExpr.getFieldSchema().alias;
return new Expression.Column(fieldName);
} else {
- if( !( op instanceof BinaryExpression ) ) {
- logInternalErrorAndSetFlag();
- return null;
- }
- BinaryExpression binOp = (BinaryExpression)op;
- if(binOp instanceof AddExpression) {
- return getExpression( binOp, OpType.OP_PLUS );
- } else if(binOp instanceof SubtractExpression) {
- return getExpression(binOp, OpType.OP_MINUS);
- } else if(binOp instanceof MultiplyExpression) {
- return getExpression(binOp, OpType.OP_TIMES);
- } else if(binOp instanceof DivideExpression) {
- return getExpression(binOp, OpType.OP_DIV);
- } else if(binOp instanceof ModExpression) {
- return getExpression(binOp, OpType.OP_MOD);
- } else if(binOp instanceof AndExpression) {
- return getExpression(binOp, OpType.OP_AND);
- } else if(binOp instanceof OrExpression) {
- return getExpression(binOp, OpType.OP_OR);
- } else if(binOp instanceof EqualExpression) {
- return getExpression(binOp, OpType.OP_EQ);
- } else if(binOp instanceof NotEqualExpression) {
- return getExpression(binOp, OpType.OP_NE);
- } else if(binOp instanceof GreaterThanExpression) {
- return getExpression(binOp, OpType.OP_GT);
- } else if(binOp instanceof GreaterThanEqualExpression) {
- return getExpression(binOp, OpType.OP_GE);
- } else if(binOp instanceof LessThanExpression) {
- return getExpression(binOp, OpType.OP_LT);
- } else if(binOp instanceof LessThanEqualExpression) {
- return getExpression(binOp, OpType.OP_LE);
- } else if(binOp instanceof RegexExpression) {
- return getExpression(binOp, OpType.OP_MATCH);
- } else {
- logInternalErrorAndSetFlag();
- }
- }
- return null;
- }
+ if( !( op instanceof BinaryExpression ) ) {
+ logInternalErrorAndSetFlag();
+ return null;
+ }
+ BinaryExpression binOp = (BinaryExpression)op;
+ if(binOp instanceof AddExpression) {
+ return getExpression( binOp, OpType.OP_PLUS );
+ } else if(binOp instanceof SubtractExpression) {
+ return getExpression(binOp, OpType.OP_MINUS);
+ } else if(binOp instanceof MultiplyExpression) {
+ return getExpression(binOp, OpType.OP_TIMES);
+ } else if(binOp instanceof DivideExpression) {
+ return getExpression(binOp, OpType.OP_DIV);
+ } else if(binOp instanceof ModExpression) {
+ return getExpression(binOp, OpType.OP_MOD);
+ } else if(binOp instanceof AndExpression) {
+ return getExpression(binOp, OpType.OP_AND);
+ } else if(binOp instanceof OrExpression) {
+ return getExpression(binOp, OpType.OP_OR);
+ } else if(binOp instanceof EqualExpression) {
+ return getExpression(binOp, OpType.OP_EQ);
+ } else if(binOp instanceof NotEqualExpression) {
+ return getExpression(binOp, OpType.OP_NE);
+ } else if(binOp instanceof GreaterThanExpression) {
+ return getExpression(binOp, OpType.OP_GT);
+ } else if(binOp instanceof GreaterThanEqualExpression) {
+ return getExpression(binOp, OpType.OP_GE);
+ } else if(binOp instanceof LessThanExpression) {
+ return getExpression(binOp, OpType.OP_LT);
+ } else if(binOp instanceof LessThanEqualExpression) {
+ return getExpression(binOp, OpType.OP_LE);
+ } else if(binOp instanceof RegexExpression) {
+ return getExpression(binOp, OpType.OP_MATCH);
+ } else {
+ logInternalErrorAndSetFlag();
+ }
+ }
+ return null;
+ }
private Expression getExpression(BinaryExpression binOp, OpType
opType) throws FrontendException {
return new Expression.BinaryExpression(getExpression(binOp.getLhs())
- ,getExpression(binOp.getRhs()), opType);
+ , getExpression(binOp.getRhs()), opType);
}
private void logInternalErrorAndSetFlag() throws FrontendException {
@@ -488,70 +528,70 @@ public class PColFilterExtractor extends
canPushDown = false;
}
- // this might get called from some visit() - in that case, delegate to
- // the other visit()s which we have defined here
- private void visit(LogicalExpression op) throws FrontendException {
- if(op instanceof ProjectExpression) {
- visit((ProjectExpression)op);
- } else if (op instanceof BinaryExpression) {
- visit((BinaryExpression)op);
- } else if (op instanceof CastExpression) {
- visit((CastExpression)op);
- } else if (op instanceof BinCondExpression) {
- visit((BinCondExpression)op);
- } else if (op instanceof UserFuncExpression) {
- visit((UserFuncExpression)op);
- } else if (op instanceof IsNullExpression) {
- visit((IsNullExpression)op);
- } else if( op instanceof NotExpression ) {
- visit( (NotExpression)op );
- } else if( op instanceof RegexExpression ) {
- visit( (RegexExpression)op );
+ // this might get called from some visit() - in that case, delegate to
+ // the other visit()s which we have defined here
+ private void visit(LogicalExpression op) throws FrontendException {
+ if(op instanceof ProjectExpression) {
+ visit((ProjectExpression)op);
+ } else if (op instanceof BinaryExpression) {
+ visit((BinaryExpression)op);
+ } else if (op instanceof CastExpression) {
+ visit((CastExpression)op);
+ } else if (op instanceof BinCondExpression) {
+ visit((BinCondExpression)op);
+ } else if (op instanceof UserFuncExpression) {
+ visit((UserFuncExpression)op);
+ } else if (op instanceof IsNullExpression) {
+ visit((IsNullExpression)op);
+ } else if( op instanceof NotExpression ) {
+ visit( (NotExpression)op );
+ } else if( op instanceof RegexExpression ) {
+ visit( (RegexExpression)op );
} else if (op instanceof MapLookupExpression) {
visit((MapLookupExpression) op);
} else if (op instanceof DereferenceExpression) {
visit((DereferenceExpression) op);
}
- }
+ }
+
+ // some specific operators which are of interest to catch some
+ // unsupported scenarios
+ private void visit(CastExpression cast) throws FrontendException {
+ visit(cast.getExpression());
+ }
+
+ private void visit(NotExpression not) throws FrontendException {
+ visit(not.getExpression());
+ }
+
+ private void visit(RegexExpression regexp) throws FrontendException {
+ visit((BinaryExpression)regexp);
+ }
+
+ private void visit(BinCondExpression binCond) throws FrontendException {
+ visit(binCond.getCondition());
+ visit(binCond.getLhs());
+ visit(binCond.getRhs());
+ }
+
+ private void visit(UserFuncExpression udf) throws FrontendException {
+ for (LogicalExpression op : udf.getArguments()) {
+ visit(op);
+ }
+ }
- // some specific operators which are of interest to catch some
- // unsupported scenarios
- private void visit(CastExpression cast) throws FrontendException {
- visit(cast.getExpression());
- }
-
- private void visit(NotExpression not) throws FrontendException {
- visit(not.getExpression());
- }
-
- private void visit(RegexExpression regexp) throws FrontendException {
- visit((BinaryExpression)regexp);
- }
-
- private void visit(BinCondExpression binCond) throws FrontendException {
- visit(binCond.getCondition());
- visit(binCond.getLhs());
- visit(binCond.getRhs());
- }
-
- private void visit(UserFuncExpression udf) throws FrontendException {
- for (LogicalExpression op : udf.getArguments()) {
- visit(op);
- }
- }
-
- private void visit(IsNullExpression isNull) throws FrontendException {
- visit(isNull.getExpression());
- }
-
- private void visit(MapLookupExpression mapLookup) throws FrontendException {
+ private void visit(IsNullExpression isNull) throws FrontendException {
+ visit(isNull.getExpression());
+ }
+
+ private void visit(MapLookupExpression mapLookup) throws FrontendException {
visit(mapLookup.getMap());
}
-
- private void visit(DereferenceExpression deref) throws FrontendException {
+
+ private void visit(DereferenceExpression deref) throws FrontendException {
visit(deref.getReferredExpression());
}
-
+
public boolean canPushDown() {
return canPushDown;
}
Modified: pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=1509455&r1=1509454&r2=1509455&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPartitionFilterPushDown.java Thu Aug 1 22:17:32 2013
@@ -720,6 +720,99 @@ public class TestPartitionFilterPushDown
test(q, Arrays.asList("srcid"), null, "(name matches 'foo*')");
}
+ /**
+ * Test PIG-3395 Large filter expression makes Pig hang
+ * @throws Exception
+ */
+ @Test
+ public void testLargeAndOrCondition() throws Exception {
+ String q = query + "b = filter a by " +
+ "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+ "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+ "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
+ "or (srcid == 10 and mrkt == '11' and dstid == 12) " +
+ "or (srcid == 13 and mrkt == '14' and dstid == 15) " +
+ "or (srcid == 16 and mrkt == '17' and dstid == 18) " +
+ "or (srcid == 19 and mrkt == '20' and dstid == 21) " +
+ "or (srcid == 22 and mrkt == '23' and dstid == 24) " +
+ "or (srcid == 25 and mrkt == '26' and dstid == 27) " +
+ "or (srcid == 28 and mrkt == '29' and dstid == 30) " +
+ "or (srcid == 31 and mrkt == '32' and dstid == 33) " +
+ "or (srcid == 34 and mrkt == '35' and dstid == 36) " +
+ "or (srcid == 37 and mrkt == '38' and dstid == 39) " +
+ "or (srcid == 40 and mrkt == '41' and dstid == 42) " +
+ "or (srcid == 43 and mrkt == '44' and dstid == 45) " +
+ "or (srcid == 46 and mrkt == '47' and dstid == 48) " +
+ "or (srcid == 49 and mrkt == '50' and dstid == 51) " +
+ "or (srcid == 52 and mrkt == '53' and dstid == 54) " +
+ "or (srcid == 55 and mrkt == '56' and dstid == 57) " +
+ "or (srcid == 58 and mrkt == '59' and dstid == 60) " +
+ "or (srcid == 61 and mrkt == '62' and dstid == 63) " +
+ "or (srcid == 64 and mrkt == '65' and dstid == 66) " +
+ "or (srcid == 67 and mrkt == '68' and dstid == 69);" +
+ "store b into 'out';";
+ test(q, Arrays.asList("srcid", "mrkt", "dstid"),
+ "(((((((((((((((((((((((((srcid == 1) and (mrkt == '2')) and (dstid == 3)) " +
+ "or (((srcid == 4) and (mrkt == '5')) and (dstid == 6))) " +
+ "or (((srcid == 7) and (mrkt == '8')) and (dstid == 9))) " +
+ "or (((srcid == 10) and (mrkt == '11')) and (dstid == 12))) " +
+ "or (((srcid == 13) and (mrkt == '14')) and (dstid == 15))) " +
+ "or (((srcid == 16) and (mrkt == '17')) and (dstid == 18))) " +
+ "or (((srcid == 19) and (mrkt == '20')) and (dstid == 21))) " +
+ "or (((srcid == 22) and (mrkt == '23')) and (dstid == 24))) " +
+ "or (((srcid == 25) and (mrkt == '26')) and (dstid == 27))) " +
+ "or (((srcid == 28) and (mrkt == '29')) and (dstid == 30))) " +
+ "or (((srcid == 31) and (mrkt == '32')) and (dstid == 33))) " +
+ "or (((srcid == 34) and (mrkt == '35')) and (dstid == 36))) " +
+ "or (((srcid == 37) and (mrkt == '38')) and (dstid == 39))) " +
+ "or (((srcid == 40) and (mrkt == '41')) and (dstid == 42))) " +
+ "or (((srcid == 43) and (mrkt == '44')) and (dstid == 45))) " +
+ "or (((srcid == 46) and (mrkt == '47')) and (dstid == 48))) " +
+ "or (((srcid == 49) and (mrkt == '50')) and (dstid == 51))) " +
+ "or (((srcid == 52) and (mrkt == '53')) and (dstid == 54))) " +
+ "or (((srcid == 55) and (mrkt == '56')) and (dstid == 57))) " +
+ "or (((srcid == 58) and (mrkt == '59')) and (dstid == 60))) " +
+ "or (((srcid == 61) and (mrkt == '62')) and (dstid == 63))) " +
+ "or (((srcid == 64) and (mrkt == '65')) and (dstid == 66))) " +
+ "or (((srcid == 67) and (mrkt == '68')) and (dstid == 69)))",
+ null);
+ }
+
+ // UDF expression should make the entire filter be rejected
+ @Test
+ public void testAndOrConditionMixedWithUdfExpr() throws Exception {
+ String q = query + "b = filter a by " +
+ "(UPPER(name) == 'FOO')" +
+ "or (srcid == 1 and mrkt == '2' and dstid == 3) " +
+ "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+ "or (srcid == 7 and mrkt == '8' and dstid == 9);" +
+ "store b into 'out';";
+ negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
+ }
+
+ // Cast expression should make the entire filter be rejected
+ @Test
+ public void testAndOrConditionMixedWithCastExpr() throws Exception {
+ String q = query + "b = filter a by " +
+ "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+ "or (srcid == 4 and (int)mrkt == 5 and dstid == 6) " +
+ "or (srcid == 7 and mrkt == '8' and dstid == 9);" +
+ "store b into 'out';";
+ negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
+ }
+
+ // Null expression should make the entire filter be rejected
+ @Test
+ public void testAndOrConditionMixedWithNullExpr() throws Exception {
+ String q = query + "b = filter a by " +
+ "(srcid == 1 and mrkt == '2' and dstid == 3) " +
+ "or (srcid == 4 and mrkt == '5' and dstid == 6) " +
+ "or (srcid == 7 and mrkt == '8' and dstid == 9) " +
+ "or (name is null);" +
+ "store b into 'out';";
+ negativeTest(q, Arrays.asList("srcid", "mrkt", "dstid"));
+ }
+
//// helper methods ///////
private PColFilterExtractor test(String query, List<String> partitionCols,
@@ -773,6 +866,20 @@ public class TestPartitionFilterPushDown
return pColExtractor;
}
+ // The filter cannot be pushed down unless it meets certain conditions. In
+ // that case, PColExtractor.getPColCondition() should return null.
+ private void negativeTest(String query, List<String> partitionCols) throws Exception {
+ PigServer pigServer = new PigServer( pc );
+ LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+ Operator op = newLogicalPlan.getSinks().get(0);
+ LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+ PColFilterExtractor pColExtractor = new PColFilterExtractor(
+ filter.getFilterPlan(), partitionCols);
+ pColExtractor.visit();
+ Assert.assertFalse(pColExtractor.canPushDown());
+ Assert.assertNull(pColExtractor.getPColCondition());
+ }
+
private void negativeTest(String query, List<String> partitionCols,
int expectedErrorCode) throws Exception {
PigServer pigServer = new PigServer( pc );