You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/11/05 23:41:23 UTC

svn commit: r1636995 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/plan/ java/org/apache/hadoop/hive/ql/ppd/ test/queries/clientpositive/ test/results/clientpositive/

Author: hashutosh
Date: Wed Nov  5 22:41:22 2014
New Revision: 1636995

URL: http://svn.apache.org/r1636995
Log:
HIVE-7111 : Extend join transitivity PPD to non-column expressions (Navis via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q
    hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java
    hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java Wed Nov  5 22:41:22 2014
@@ -193,6 +193,7 @@ public class ExprNodeDescUtils {
   /**
    * Convert expressions in current operator to those in terminal operator, which
    * is an ancestor of current or null (back to top operator).
+   * Possibly contain null values for non-traceable exprs
    */
   public static ArrayList<ExprNodeDesc> backtrack(List<ExprNodeDesc> sources,
       Operator<?> current, Operator<?> terminal) throws SemanticException {
@@ -396,29 +397,34 @@ public class ExprNodeDescUtils {
 	 * Get Map of ExprNodeColumnDesc HashCode to ExprNodeColumnDesc.
 	 * 
 	 * @param exprDesc
-	 * @param hashCodeTocolumnDescMap
+	 * @param hashCodeToColumnDescMap
 	 *            Assumption: If two ExprNodeColumnDesc have same hash code then
 	 *            they are logically referring to same projection
 	 */
 	public static void getExprNodeColumnDesc(ExprNodeDesc exprDesc,
-			Map<Integer, ExprNodeDesc> hashCodeTocolumnDescMap) {
+			Map<Integer, ExprNodeDesc> hashCodeToColumnDescMap) {
 		if (exprDesc instanceof ExprNodeColumnDesc) {
-			hashCodeTocolumnDescMap.put(
-					((ExprNodeColumnDesc) exprDesc).hashCode(),
-					((ExprNodeColumnDesc) exprDesc));
+			hashCodeToColumnDescMap.put(exprDesc.hashCode(), exprDesc);
 		} else if (exprDesc instanceof ExprNodeColumnListDesc) {
-			for (ExprNodeDesc child : ((ExprNodeColumnListDesc) exprDesc)
-					.getChildren()) {
-				getExprNodeColumnDesc(child, hashCodeTocolumnDescMap);
+			for (ExprNodeDesc child : exprDesc.getChildren()) {
+				getExprNodeColumnDesc(child, hashCodeToColumnDescMap);
 			}
 		} else if (exprDesc instanceof ExprNodeGenericFuncDesc) {
-			for (ExprNodeDesc child : ((ExprNodeGenericFuncDesc) exprDesc)
-					.getChildren()) {
-				getExprNodeColumnDesc(child, hashCodeTocolumnDescMap);
+			for (ExprNodeDesc child : exprDesc.getChildren()) {
+				getExprNodeColumnDesc(child, hashCodeToColumnDescMap);
 			}
 		} else if (exprDesc instanceof ExprNodeFieldDesc) {
 			getExprNodeColumnDesc(((ExprNodeFieldDesc) exprDesc).getDesc(),
-					hashCodeTocolumnDescMap);
+					hashCodeToColumnDescMap);
 		}
 	}
+
+  public static boolean isAllConstants(List<ExprNodeDesc> value) {
+    for (ExprNodeDesc expr : value) {
+      if (!(expr instanceof ExprNodeConstantDesc)) {
+        return false;
+      }
+    }
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java Wed Nov  5 22:41:22 2014
@@ -19,16 +19,13 @@ package org.apache.hadoop.hive.ql.ppd;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -143,7 +140,7 @@ public class ExprWalkerInfo implements N
    * @return converted expression for give node. If there is none then returns
    *         null.
    */
-  public ExprNodeDesc getConvertedNode(Node nd) {
+  public ExprNodeDesc getConvertedNode(ExprNodeDesc nd) {
     ExprInfo ei = exprInfoMap.get(nd);
     if (ei == null) {
       return null;
@@ -238,11 +235,11 @@ public class ExprWalkerInfo implements N
    * @param expr
    */
   public void addFinalCandidate(ExprNodeDesc expr) {
-    String alias = getAlias(expr);
-    if (pushdownPreds.get(alias) == null) {
-      pushdownPreds.put(alias, new ArrayList<ExprNodeDesc>());
-    }
-    pushdownPreds.get(alias).add(expr);
+    addFinalCandidate(getAlias(expr), expr);
+  }
+
+  public void addFinalCandidate(String alias, ExprNodeDesc expr) {
+    getPushdownPreds(alias).add(expr);
   }
 
   /**
@@ -252,10 +249,7 @@ public class ExprWalkerInfo implements N
    * @param pushDowns
    */
   public void addPushDowns(String alias, List<ExprNodeDesc> pushDowns) {
-    if (pushdownPreds.get(alias) == null) {
-      pushdownPreds.put(alias, new ArrayList<ExprNodeDesc>());
-    }
-    pushdownPreds.get(alias).addAll(pushDowns);
+    getPushdownPreds(alias).addAll(pushDowns);
   }
 
   /**
@@ -269,6 +263,26 @@ public class ExprWalkerInfo implements N
     return pushdownPreds;
   }
 
+  private List<ExprNodeDesc> getPushdownPreds(String alias) {
+    List<ExprNodeDesc> predicates = pushdownPreds.get(alias);
+    if (predicates == null) {
+      pushdownPreds.put(alias, predicates = new ArrayList<ExprNodeDesc>());
+    }
+    return predicates;
+  }
+
+  public boolean hasAnyCandidates() {
+    if (pushdownPreds == null || pushdownPreds.isEmpty()) {
+      return false;
+    }
+    for (List<ExprNodeDesc> exprs : pushdownPreds.values()) {
+      if (!exprs.isEmpty()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * Adds the specified expr as a non-final candidate
    *

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Wed Nov  5 22:41:22 2014
@@ -31,7 +31,6 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
@@ -48,9 +47,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.HiveParser;
 import org.apache.hadoop.hive.ql.parse.OpParseContext;
 import org.apache.hadoop.hive.ql.parse.RowResolver;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -342,14 +338,12 @@ public final class OpProcFactory {
       super.process(nd, stack, procCtx, nodeOutputs);
       OpWalkerInfo owi = (OpWalkerInfo) procCtx;
       ExprWalkerInfo prunedPred = owi.getPrunedPreds((Operator<? extends OperatorDesc>) nd);
-      if (prunedPred == null) {
+      if (prunedPred == null || !prunedPred.hasAnyCandidates()) {
         return null;
       }
       Map<String, List<ExprNodeDesc>> candidates = prunedPred.getFinalCandidates();
-      if (candidates != null && !candidates.isEmpty()) {
-        createFilter((Operator)nd, prunedPred, owi);
-        candidates.clear();
-      }
+      createFilter((Operator)nd, prunedPred, owi);
+      candidates.clear();
       return null;
     }
 
@@ -476,7 +470,12 @@ public final class OpProcFactory {
         Set<String> toRemove = new HashSet<String>();
         // we don't push down any expressions that refer to aliases that can;t
         // be pushed down per getQualifiedAliases
-        for (String key : prunePreds.getFinalCandidates().keySet()) {
+        for (Entry<String, List<ExprNodeDesc>> entry : prunePreds.getFinalCandidates().entrySet()) {
+          String key = entry.getKey();
+          List<ExprNodeDesc> value = entry.getValue();
+          if (key == null && ExprNodeDescUtils.isAllConstants(value)) {
+            continue;   // propagate constants
+          }
           if (!aliases.contains(key)) {
             toRemove.add(key);
           }
@@ -517,199 +516,6 @@ public final class OpProcFactory {
       return getQualifiedAliases((JoinOperator) nd, owi.getRowResolver(nd));
     }
 
-    @Override
-    protected Object handlePredicates(Node nd, ExprWalkerInfo prunePreds, OpWalkerInfo owi)
-        throws SemanticException {
-      if (HiveConf.getBoolVar(owi.getParseContext().getConf(),
-          HiveConf.ConfVars.HIVEPPDRECOGNIZETRANSITIVITY)) {
-        applyFilterTransitivity((JoinOperator) nd, owi);
-      }
-      return super.handlePredicates(nd, prunePreds, owi);
-    }
-
-    /**
-     * Adds additional pushdown predicates for a join operator by replicating
-     * filters transitively over all the equijoin conditions.
-     *
-     * If we have a predicate "t.col=1" and the equijoin conditions
-     * "t.col=s.col" and "t.col=u.col", we add the filters "s.col=1" and
-     * "u.col=1". Note that this does not depend on the types of joins (ie.
-     * inner, left/right/full outer) between the tables s, t and u because if
-     * a predicate, eg. "t.col=1" is present in getFinalCandidates() at this
-     * point, we have already verified that it can be pushed down, so any rows
-     * emitted must satisfy s.col=t.col=u.col=1 and replicating the filters
-     * like this is ok.
-     */
-    private void applyFilterTransitivity(JoinOperator nd, OpWalkerInfo owi)
-        throws SemanticException {
-      ExprWalkerInfo prunePreds =
-          owi.getPrunedPreds(nd);
-      if (prunePreds != null) {
-        // We want to use the row resolvers of the parents of the join op
-        // because the rowresolver refers to the output columns of an operator
-        // and the filters at this point refer to the input columns of the join
-        // operator.
-        Map<String, RowResolver> aliasToRR =
-            new HashMap<String, RowResolver>();
-        for (Operator<? extends OperatorDesc> o : (nd).getParentOperators()) {
-          for (String alias : owi.getRowResolver(o).getTableNames()){
-            aliasToRR.put(alias, owi.getRowResolver(o));
-          }
-        }
-
-        // eqExpressions is a list of ArrayList<ASTNode>'s, one for each table
-        // in the join. Then for each i, j and k, the join condition is that
-        // eqExpressions[i][k]=eqExpressions[j][k] (*) (ie. the columns referenced
-        // by the corresponding ASTNodes are equal). For example, if the query
-        // was SELECT * FROM a join b on a.col=b.col and a.col2=b.col2 left
-        // outer join c on b.col=c.col and b.col2=c.col2 WHERE c.col=1,
-        // eqExpressions would be [[a.col1, a.col2], [b.col1, b.col2],
-        // [c.col1, c.col2]].
-        //
-        // numEqualities is the number of equal columns in each equality
-        // "chain" and numColumns is the number of such chains.
-        //
-        // Note that (*) is guaranteed to be true for the
-        // join operator: if the equijoin condititions can't be expressed in
-        // these equal-length lists of equal columns (for example if we had the
-        // query SELECT * FROM a join b on a.col=b.col and a.col2=b.col2 left
-        // outer join c on b.col=c.col), more than one join operator is used.
-        ArrayList<ArrayList<ASTNode>> eqExpressions =
-            owi.getParseContext().getJoinContext().get(nd).getExpressions();
-        int numColumns = eqExpressions.size();
-        int numEqualities = eqExpressions.get(0).size();
-
-        // oldFilters contains the filters to be pushed down
-        Map<String, List<ExprNodeDesc>> oldFilters =
-            prunePreds.getFinalCandidates();
-        Map<String, List<ExprNodeDesc>> newFilters =
-            new HashMap<String, List<ExprNodeDesc>>();
-
-        // We loop through for each chain of equalities
-        for (int i=0; i<numEqualities; i++) {
-          // equalColumns[i] is the ColumnInfo corresponding to the ith term
-          // of the equality or null if the term is not a simple column
-          // reference
-          ColumnInfo[] equalColumns=new ColumnInfo[numColumns];
-          for (int j=0; j<numColumns; j++) {
-            equalColumns[j] =
-                getColumnInfoFromAST(eqExpressions.get(j).get(i), aliasToRR);
-          }
-          for (int j=0; j<numColumns; j++) {
-            for (int k=0; k<numColumns; k++) {
-              if (j != k && equalColumns[j]!= null
-                  && equalColumns[k] != null) {
-                // terms j and k in the equality chain are simple columns,
-                // so we can replace instances of column j with column k
-                // in the filter and ad the replicated filter.
-                ColumnInfo left = equalColumns[j];
-                ColumnInfo right = equalColumns[k];
-                if (oldFilters.get(left.getTabAlias()) != null){
-                  for (ExprNodeDesc expr :
-                    oldFilters.get(left.getTabAlias())) {
-                    // Only replicate the filter if there is exactly one column
-                    // referenced
-                    Set<String> colsreferenced =
-                        new HashSet<String>(expr.getCols());
-                    if (colsreferenced.size() == 1
-                        && colsreferenced.contains(left.getInternalName())){
-                      ExprNodeDesc newexpr = expr.clone();
-                      // Replace the column reference in the filter
-                      replaceColumnReference(newexpr, left.getInternalName(),
-                          right.getInternalName());
-                      if (newFilters.get(right.getTabAlias()) == null) {
-                        newFilters.put(right.getTabAlias(),
-                            new ArrayList<ExprNodeDesc>());
-                      }
-                      newFilters.get(right.getTabAlias()).add(newexpr);
-                    }
-                  }
-                }
-              }
-            }
-          }
-        }
-        // Push where false filter transitively
-        Map<String,List<ExprNodeDesc>> candidates = prunePreds.getNonFinalCandidates();
-        List<ExprNodeDesc> exprs;
-        // where false is not associated with any alias in candidates
-        if (null != candidates && candidates.get(null) != null && ((exprs = candidates.get(null)) != null)) {
-          Iterator<ExprNodeDesc> itr = exprs.iterator();
-          while (itr.hasNext()) {
-            ExprNodeDesc expr = itr.next();
-            if (expr instanceof ExprNodeConstantDesc && Boolean.FALSE.equals(((ExprNodeConstantDesc)expr).getValue())) {
-              // push this 'where false' expr to all aliases
-              for (String alias : aliasToRR.keySet()) {
-                List<ExprNodeDesc> pushedFilters = newFilters.get(alias);
-                if (null == pushedFilters) {
-                  newFilters.put(alias, new ArrayList<ExprNodeDesc>());
-
-                }
-                newFilters.get(alias).add(expr);
-              }
-              // this filter is pushed, we can remove it from non-final candidates.
-              itr.remove();
-            }
-          }
-        }
-        for (Entry<String, List<ExprNodeDesc>> aliasToFilters
-            : newFilters.entrySet()){
-          owi.getPrunedPreds(nd)
-            .addPushDowns(aliasToFilters.getKey(), aliasToFilters.getValue());
-        }
-      }
-    }
-
-    /**
-     * Replaces the ColumnInfo for the column referred to by an ASTNode
-     * representing "table.column" or null if the ASTNode is not in that form
-     */
-    private ColumnInfo getColumnInfoFromAST(ASTNode nd,
-        Map<String, RowResolver> aliastoRR) throws SemanticException {
-      // this bit is messy since we are parsing an ASTNode at this point
-      if (nd.getType()==HiveParser.DOT) {
-        if (nd.getChildCount()==2) {
-          if (nd.getChild(0).getType()==HiveParser.TOK_TABLE_OR_COL
-              && nd.getChild(0).getChildCount()==1
-              && nd.getChild(1).getType()==HiveParser.Identifier){
-            // We unescape the identifiers and make them lower case--this
-            // really shouldn't be done here, but getExpressions gives us the
-            // raw ASTNodes. The same thing is done in SemanticAnalyzer.
-            // parseJoinCondPopulateAlias().
-            String alias = BaseSemanticAnalyzer.unescapeIdentifier(
-                nd.getChild(0).getChild(0).getText().toLowerCase());
-            String column = BaseSemanticAnalyzer.unescapeIdentifier(
-                nd.getChild(1).getText().toLowerCase());
-            RowResolver rr=aliastoRR.get(alias);
-            if (rr == null) {
-              return null;
-            }
-            return rr.get(alias, column);
-          }
-        }
-      }
-      return null;
-    }
-
-    /**
-     * Replaces all instances of oldColumn with newColumn in the
-     * ExprColumnDesc's of the ExprNodeDesc
-     */
-    private void replaceColumnReference(ExprNodeDesc expr,
-        String oldColumn, String newColumn) {
-      if (expr instanceof ExprNodeColumnDesc) {
-        if (((ExprNodeColumnDesc) expr).getColumn().equals(oldColumn)){
-          ((ExprNodeColumnDesc) expr).setColumn(newColumn);
-        }
-      }
-
-      if (expr.getChildren() != null){
-        for (ExprNodeDesc childexpr : expr.getChildren()) {
-          replaceColumnReference(childexpr, oldColumn, newColumn);
-        }
-      }
-    }
-
     /**
      * Figures out the aliases for whom it is safe to push predicates based on
      * ANSI SQL semantics. The join conditions are left associative so "a
@@ -760,6 +566,86 @@ public final class OpProcFactory {
     }
   }
 
+  public static class ReduceSinkPPD extends DefaultPPD implements NodeProcessor {
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                          Object... nodeOutputs) throws SemanticException {
+      super.process(nd, stack, procCtx, nodeOutputs);
+      Operator<?> operator = (Operator<?>) nd;
+      OpWalkerInfo owi = (OpWalkerInfo) procCtx;
+      if (operator.getNumChild() == 1 &&
+          operator.getChildOperators().get(0) instanceof JoinOperator) {
+        if (HiveConf.getBoolVar(owi.getParseContext().getConf(),
+            HiveConf.ConfVars.HIVEPPDRECOGNIZETRANSITIVITY)) {
+          JoinOperator child = (JoinOperator) operator.getChildOperators().get(0);
+          int targetPos = child.getParentOperators().indexOf(operator);
+          applyFilterTransitivity(child, targetPos, owi);
+        }
+      }
+      return null;
+    }
+
+    /**
+     * Adds additional pushdown predicates for a join operator by replicating
+     * filters transitively over all the equijoin conditions.
+     *
+     * If we have a predicate "t.col=1" and the equijoin conditions
+     * "t.col=s.col" and "t.col=u.col", we add the filters "s.col=1" and
+     * "u.col=1". Note that this does not depend on the types of joins (ie.
+     * inner, left/right/full outer) between the tables s, t and u because if
+     * a predicate, eg. "t.col=1" is present in getFinalCandidates() at this
+     * point, we have already verified that it can be pushed down, so any rows
+     * emitted must satisfy s.col=t.col=u.col=1 and replicating the filters
+     * like this is ok.
+     */
+    private void applyFilterTransitivity(JoinOperator join, int targetPos, OpWalkerInfo owi)
+        throws SemanticException {
+
+      ExprWalkerInfo joinPreds = owi.getPrunedPreds(join);
+      if (joinPreds == null || !joinPreds.hasAnyCandidates()) {
+        return;
+      }
+      Map<String, List<ExprNodeDesc>> oldFilters = joinPreds.getFinalCandidates();
+      Map<String, List<ExprNodeDesc>> newFilters = new HashMap<String, List<ExprNodeDesc>>();
+
+      List<Operator<? extends OperatorDesc>> parentOperators = join.getParentOperators();
+
+      ReduceSinkOperator target = (ReduceSinkOperator) parentOperators.get(targetPos);
+      List<ExprNodeDesc> targetKeys = target.getConf().getKeyCols();
+
+      ExprWalkerInfo rsPreds = owi.getPrunedPreds(target);
+      for (int sourcePos = 0; sourcePos < parentOperators.size(); sourcePos++) {
+        ReduceSinkOperator source = (ReduceSinkOperator) parentOperators.get(sourcePos);
+        List<ExprNodeDesc> sourceKeys = source.getConf().getKeyCols();
+        Set<String> sourceAliases = new HashSet<String>(Arrays.asList(source.getInputAliases()));
+        for (Map.Entry<String, List<ExprNodeDesc>> entry : oldFilters.entrySet()) {
+          if (entry.getKey() == null && ExprNodeDescUtils.isAllConstants(entry.getValue())) {
+            // propagate constants
+            for (String targetAlias : target.getInputAliases()) {
+              rsPreds.addPushDowns(targetAlias, entry.getValue());
+            }
+            continue;
+          }
+          if (!sourceAliases.contains(entry.getKey())) {
+            continue;
+          }
+          for (ExprNodeDesc predicate : entry.getValue()) {
+            ExprNodeDesc backtrack = ExprNodeDescUtils.backtrack(predicate, join, source);
+            if (backtrack == null) {
+              continue;
+            }
+            ExprNodeDesc replaced = ExprNodeDescUtils.replace(backtrack, sourceKeys, targetKeys);
+            if (replaced == null) {
+              continue;
+            }
+            for (String targetAlias : target.getInputAliases()) {
+              rsPreds.addFinalCandidate(targetAlias, replaced);
+            }
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Default processor which just merges its children.
    */
@@ -900,11 +786,10 @@ public final class OpProcFactory {
 
   protected static Object createFilter(Operator op,
       ExprWalkerInfo pushDownPreds, OpWalkerInfo owi) {
-    if (pushDownPreds == null || pushDownPreds.getFinalCandidates() == null
-        || pushDownPreds.getFinalCandidates().size() == 0) {
-      return null;
+    if (pushDownPreds != null && pushDownPreds.hasAnyCandidates()) {
+      return createFilter(op, pushDownPreds.getFinalCandidates(), owi);
     }
-    return createFilter(op, pushDownPreds.getFinalCandidates(), owi);
+    return null;
   }
 
   protected static Object createFilter(Operator op,
@@ -1113,6 +998,10 @@ public final class OpProcFactory {
     return new JoinerPPD();
   }
 
+  public static NodeProcessor getRSProc() {
+    return new ReduceSinkPPD();
+  }
+
   private OpProcFactory() {
     // prevent instantiation
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java Wed Nov  5 22:41:22 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.La
 import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.ScriptOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.UDTFOperator;
@@ -118,6 +119,9 @@ public class PredicatePushDown implement
     opRules.put(new RuleRegExp("R9",
       LateralViewJoinOperator.getOperatorName() + "%"),
       OpProcFactory.getLVJProc());
+    opRules.put(new RuleRegExp("R10",
+        ReduceSinkOperator.getOperatorName() + "%"),
+        OpProcFactory.getRSProc());
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java Wed Nov  5 22:41:22 2014
@@ -30,7 +30,6 @@ import java.util.Stack;
 import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -179,40 +178,6 @@ public class PredicateTransitivePropagat
       return null;
     }
 
-    // calculate filter propagation directions for each alias
-    // L<->R for innner/semi join, L->R for left outer join, R->L for right outer join
-    private int[][] getTargets(CommonJoinOperator<JoinDesc> join) {
-      JoinCondDesc[] conds = join.getConf().getConds();
-
-      int aliases = conds.length + 1;
-      Vectors vector = new Vectors(aliases);
-      for (JoinCondDesc cond : conds) {
-        int left = cond.getLeft();
-        int right = cond.getRight();
-        switch (cond.getType()) {
-          case JoinDesc.INNER_JOIN:
-          case JoinDesc.LEFT_SEMI_JOIN:
-            vector.add(left, right);
-            vector.add(right, left);
-            break;
-          case JoinDesc.LEFT_OUTER_JOIN:
-            vector.add(left, right);
-            break;
-          case JoinDesc.RIGHT_OUTER_JOIN:
-            vector.add(right, left);
-            break;
-          case JoinDesc.FULL_OUTER_JOIN:
-            break;
-        }
-      }
-      int[][] result = new int[aliases][];
-      for (int pos = 0 ; pos < aliases; pos++) {
-        // find all targets recursively
-        result[pos] = vector.traverse(pos);
-      }
-      return result;
-    }
-
     // check same filter exists already
     private boolean filterExists(ReduceSinkOperator target, ExprNodeDesc replaced) {
       Operator<?> operator = target.getParentOperators().get(0);
@@ -226,6 +191,40 @@ public class PredicateTransitivePropagat
     }
   }
 
+  // calculate filter propagation directions for each alias
+  // L<->R for inner/semi join, L->R for left outer join, R->L for right outer join
+  public static int[][] getTargets(CommonJoinOperator<JoinDesc> join) {
+    JoinCondDesc[] conds = join.getConf().getConds();
+
+    int aliases = conds.length + 1;
+    Vectors vector = new Vectors(aliases);
+    for (JoinCondDesc cond : conds) {
+      int left = cond.getLeft();
+      int right = cond.getRight();
+      switch (cond.getType()) {
+        case JoinDesc.INNER_JOIN:
+        case JoinDesc.LEFT_SEMI_JOIN:
+          vector.add(left, right);
+          vector.add(right, left);
+          break;
+        case JoinDesc.LEFT_OUTER_JOIN:
+          vector.add(left, right);
+          break;
+        case JoinDesc.RIGHT_OUTER_JOIN:
+          vector.add(right, left);
+          break;
+        case JoinDesc.FULL_OUTER_JOIN:
+          break;
+      }
+    }
+    int[][] result = new int[aliases][];
+    for (int pos = 0 ; pos < aliases; pos++) {
+      // find all targets recursively
+      result[pos] = vector.traverse(pos);
+    }
+    return result;
+  }
+
   private static class Vectors {
 
     private Set<Integer>[] vector;
@@ -245,10 +244,11 @@ public class PredicateTransitivePropagat
     public int[] traverse(int pos) {
       Set<Integer> targets = new HashSet<Integer>();
       traverse(targets, pos);
-      return toArray(targets);
+      return toArray(targets, pos);
     }
 
-    private int[] toArray(Set<Integer> values) {
+    private int[] toArray(Set<Integer> values, int pos) {
+      values.remove(pos);
       int index = 0;
       int[] result = new int[values.size()];
       for (int value : values) {

Added: hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q?rev=1636995&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q Wed Nov  5 22:41:22 2014
@@ -0,0 +1,16 @@
+set hive.auto.convert.join = true;
+
+-- SORT_QUERY_RESULTS
+
+explain 
+SELECT * FROM
+  (SELECT * FROM src WHERE key+1 < 10) a
+    JOIN 
+  (SELECT * FROM src WHERE key+2 < 10) b
+    ON a.key+1=b.key+2;
+
+SELECT * FROM
+  (SELECT * FROM src WHERE key+1 < 10) a
+    JOIN
+  (SELECT * FROM src WHERE key+2 < 10) b
+    ON a.key+1=b.key+2;

Added: hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out?rev=1636995&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out Wed Nov  5 22:41:22 2014
@@ -0,0 +1,113 @@
+PREHOOK: query: -- SORT_QUERY_RESULTS
+
+explain 
+SELECT * FROM
+  (SELECT * FROM src WHERE key+1 < 10) a
+    JOIN 
+  (SELECT * FROM src WHERE key+2 < 10) b
+    ON a.key+1=b.key+2
+PREHOOK: type: QUERY
+POSTHOOK: query: -- SORT_QUERY_RESULTS
+
+explain 
+SELECT * FROM
+  (SELECT * FROM src WHERE key+1 < 10) a
+    JOIN 
+  (SELECT * FROM src WHERE key+2 < 10) b
+    ON a.key+1=b.key+2
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-4 is a root stage
+  Stage-3 depends on stages: Stage-4
+  Stage-0 depends on stages: Stage-3
+
+STAGE PLANS:
+  Stage: Stage-4
+    Map Reduce Local Work
+      Alias -> Map Local Tables:
+        a:src 
+          Fetch Operator
+            limit: -1
+      Alias -> Map Local Operator Tree:
+        a:src 
+          TableScan
+            alias: src
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (((key + 1) < 10) and (key + 1) is not null) (type: boolean)
+              Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: key (type: string), value (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+                HashTable Sink Operator
+                  condition expressions:
+                    0 {_col0} {_col1}
+                    1 {_col0} {_col1}
+                  keys:
+                    0 (_col0 + 1) (type: double)
+                    1 (_col0 + 2) (type: double)
+
+  Stage: Stage-3
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: src
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+            Filter Operator
+              predicate: (((key + 2) < 10) and (key + 2) is not null) (type: boolean)
+              Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: key (type: string), value (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+                Map Join Operator
+                  condition map:
+                       Inner Join 0 to 1
+                  condition expressions:
+                    0 {_col0} {_col1}
+                    1 {_col0} {_col1}
+                  keys:
+                    0 (_col0 + 1) (type: double)
+                    1 (_col0 + 2) (type: double)
+                  outputColumnNames: _col0, _col1, _col2, _col3
+                  Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE
+                  Select Operator
+                    expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
+                    outputColumnNames: _col0, _col1, _col2, _col3
+                    Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+      Local Work:
+        Map Reduce Local Work
+
+  Stage: Stage-0
+    Fetch Operator
+      limit: -1
+      Processor Tree:
+        ListSink
+
+PREHOOK: query: SELECT * FROM
+  (SELECT * FROM src WHERE key+1 < 10) a
+    JOIN
+  (SELECT * FROM src WHERE key+2 < 10) b
+    ON a.key+1=b.key+2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM
+  (SELECT * FROM src WHERE key+1 < 10) a
+    JOIN
+  (SELECT * FROM src WHERE key+2 < 10) b
+    ON a.key+1=b.key+2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+5	val_5	4	val_4
+5	val_5	4	val_4
+5	val_5	4	val_4

Modified: hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out Wed Nov  5 22:41:22 2014
@@ -373,17 +373,17 @@ STAGE PLANS:
             alias: lineitem
             Statistics: Num rows: 1099 Data size: 12099 Basic stats: COMPLETE Column stats: NONE
             Filter Operator
-              predicate: (((l_shipmode = 'AIR') and l_orderkey is not null) and l_linenumber is not null) (type: boolean)
-              Statistics: Num rows: 138 Data size: 1519 Basic stats: COMPLETE Column stats: NONE
+              predicate: ((((l_shipmode = 'AIR') and l_orderkey is not null) and l_linenumber is not null) and (l_linenumber = 1)) (type: boolean)
+              Statistics: Num rows: 69 Data size: 759 Basic stats: COMPLETE Column stats: NONE
               Select Operator
-                expressions: l_orderkey (type: int), l_linenumber (type: int)
+                expressions: l_orderkey (type: int), 1 (type: int)
                 outputColumnNames: _col0, _col1
-                Statistics: Num rows: 138 Data size: 1519 Basic stats: COMPLETE Column stats: NONE
+                Statistics: Num rows: 69 Data size: 759 Basic stats: COMPLETE Column stats: NONE
                 Group By Operator
                   keys: _col0 (type: int), _col1 (type: int)
                   mode: hash
                   outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 138 Data size: 1519 Basic stats: COMPLETE Column stats: NONE
+                  Statistics: Num rows: 69 Data size: 759 Basic stats: COMPLETE Column stats: NONE
                   HashTable Sink Operator
                     condition expressions:
                       0 {_col0} {_col3}