You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/11/05 23:41:23 UTC
svn commit: r1636995 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/plan/ java/org/apache/hadoop/hive/ql/ppd/
test/queries/clientpositive/ test/results/clientpositive/
Author: hashutosh
Date: Wed Nov 5 22:41:22 2014
New Revision: 1636995
URL: http://svn.apache.org/r1636995
Log:
HIVE-7111 : Extend join transitivity PPD to non-column expressions (Navis via Ashutosh Chauhan)
Added:
hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q
hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java
hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java Wed Nov 5 22:41:22 2014
@@ -193,6 +193,7 @@ public class ExprNodeDescUtils {
/**
* Convert expressions in current operator to those in terminal operator, which
* is an ancestor of current or null (back to top operator).
+ * Possibly contain null values for non-traceable exprs
*/
public static ArrayList<ExprNodeDesc> backtrack(List<ExprNodeDesc> sources,
Operator<?> current, Operator<?> terminal) throws SemanticException {
@@ -396,29 +397,34 @@ public class ExprNodeDescUtils {
* Get Map of ExprNodeColumnDesc HashCode to ExprNodeColumnDesc.
*
* @param exprDesc
- * @param hashCodeTocolumnDescMap
+ * @param hashCodeToColumnDescMap
* Assumption: If two ExprNodeColumnDesc have same hash code then
* they are logically referring to same projection
*/
public static void getExprNodeColumnDesc(ExprNodeDesc exprDesc,
- Map<Integer, ExprNodeDesc> hashCodeTocolumnDescMap) {
+ Map<Integer, ExprNodeDesc> hashCodeToColumnDescMap) {
if (exprDesc instanceof ExprNodeColumnDesc) {
- hashCodeTocolumnDescMap.put(
- ((ExprNodeColumnDesc) exprDesc).hashCode(),
- ((ExprNodeColumnDesc) exprDesc));
+ hashCodeToColumnDescMap.put(exprDesc.hashCode(), exprDesc);
} else if (exprDesc instanceof ExprNodeColumnListDesc) {
- for (ExprNodeDesc child : ((ExprNodeColumnListDesc) exprDesc)
- .getChildren()) {
- getExprNodeColumnDesc(child, hashCodeTocolumnDescMap);
+ for (ExprNodeDesc child : exprDesc.getChildren()) {
+ getExprNodeColumnDesc(child, hashCodeToColumnDescMap);
}
} else if (exprDesc instanceof ExprNodeGenericFuncDesc) {
- for (ExprNodeDesc child : ((ExprNodeGenericFuncDesc) exprDesc)
- .getChildren()) {
- getExprNodeColumnDesc(child, hashCodeTocolumnDescMap);
+ for (ExprNodeDesc child : exprDesc.getChildren()) {
+ getExprNodeColumnDesc(child, hashCodeToColumnDescMap);
}
} else if (exprDesc instanceof ExprNodeFieldDesc) {
getExprNodeColumnDesc(((ExprNodeFieldDesc) exprDesc).getDesc(),
- hashCodeTocolumnDescMap);
+ hashCodeToColumnDescMap);
}
}
+
+ public static boolean isAllConstants(List<ExprNodeDesc> value) {
+ for (ExprNodeDesc expr : value) {
+ if (!(expr instanceof ExprNodeConstantDesc)) {
+ return false;
+ }
+ }
+ return true;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerInfo.java Wed Nov 5 22:41:22 2014
@@ -19,16 +19,13 @@ package org.apache.hadoop.hive.ql.ppd;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -143,7 +140,7 @@ public class ExprWalkerInfo implements N
* @return converted expression for give node. If there is none then returns
* null.
*/
- public ExprNodeDesc getConvertedNode(Node nd) {
+ public ExprNodeDesc getConvertedNode(ExprNodeDesc nd) {
ExprInfo ei = exprInfoMap.get(nd);
if (ei == null) {
return null;
@@ -238,11 +235,11 @@ public class ExprWalkerInfo implements N
* @param expr
*/
public void addFinalCandidate(ExprNodeDesc expr) {
- String alias = getAlias(expr);
- if (pushdownPreds.get(alias) == null) {
- pushdownPreds.put(alias, new ArrayList<ExprNodeDesc>());
- }
- pushdownPreds.get(alias).add(expr);
+ addFinalCandidate(getAlias(expr), expr);
+ }
+
+ public void addFinalCandidate(String alias, ExprNodeDesc expr) {
+ getPushdownPreds(alias).add(expr);
}
/**
@@ -252,10 +249,7 @@ public class ExprWalkerInfo implements N
* @param pushDowns
*/
public void addPushDowns(String alias, List<ExprNodeDesc> pushDowns) {
- if (pushdownPreds.get(alias) == null) {
- pushdownPreds.put(alias, new ArrayList<ExprNodeDesc>());
- }
- pushdownPreds.get(alias).addAll(pushDowns);
+ getPushdownPreds(alias).addAll(pushDowns);
}
/**
@@ -269,6 +263,26 @@ public class ExprWalkerInfo implements N
return pushdownPreds;
}
+ private List<ExprNodeDesc> getPushdownPreds(String alias) {
+ List<ExprNodeDesc> predicates = pushdownPreds.get(alias);
+ if (predicates == null) {
+ pushdownPreds.put(alias, predicates = new ArrayList<ExprNodeDesc>());
+ }
+ return predicates;
+ }
+
+ public boolean hasAnyCandidates() {
+ if (pushdownPreds == null || pushdownPreds.isEmpty()) {
+ return false;
+ }
+ for (List<ExprNodeDesc> exprs : pushdownPreds.values()) {
+ if (!exprs.isEmpty()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Adds the specified expr as a non-final candidate
*
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Wed Nov 5 22:41:22 2014
@@ -31,7 +31,6 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
@@ -48,9 +47,6 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
-import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
-import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -342,14 +338,12 @@ public final class OpProcFactory {
super.process(nd, stack, procCtx, nodeOutputs);
OpWalkerInfo owi = (OpWalkerInfo) procCtx;
ExprWalkerInfo prunedPred = owi.getPrunedPreds((Operator<? extends OperatorDesc>) nd);
- if (prunedPred == null) {
+ if (prunedPred == null || !prunedPred.hasAnyCandidates()) {
return null;
}
Map<String, List<ExprNodeDesc>> candidates = prunedPred.getFinalCandidates();
- if (candidates != null && !candidates.isEmpty()) {
- createFilter((Operator)nd, prunedPred, owi);
- candidates.clear();
- }
+ createFilter((Operator)nd, prunedPred, owi);
+ candidates.clear();
return null;
}
@@ -476,7 +470,12 @@ public final class OpProcFactory {
Set<String> toRemove = new HashSet<String>();
// we don't push down any expressions that refer to aliases that can;t
// be pushed down per getQualifiedAliases
- for (String key : prunePreds.getFinalCandidates().keySet()) {
+ for (Entry<String, List<ExprNodeDesc>> entry : prunePreds.getFinalCandidates().entrySet()) {
+ String key = entry.getKey();
+ List<ExprNodeDesc> value = entry.getValue();
+ if (key == null && ExprNodeDescUtils.isAllConstants(value)) {
+ continue; // propagate constants
+ }
if (!aliases.contains(key)) {
toRemove.add(key);
}
@@ -517,199 +516,6 @@ public final class OpProcFactory {
return getQualifiedAliases((JoinOperator) nd, owi.getRowResolver(nd));
}
- @Override
- protected Object handlePredicates(Node nd, ExprWalkerInfo prunePreds, OpWalkerInfo owi)
- throws SemanticException {
- if (HiveConf.getBoolVar(owi.getParseContext().getConf(),
- HiveConf.ConfVars.HIVEPPDRECOGNIZETRANSITIVITY)) {
- applyFilterTransitivity((JoinOperator) nd, owi);
- }
- return super.handlePredicates(nd, prunePreds, owi);
- }
-
- /**
- * Adds additional pushdown predicates for a join operator by replicating
- * filters transitively over all the equijoin conditions.
- *
- * If we have a predicate "t.col=1" and the equijoin conditions
- * "t.col=s.col" and "t.col=u.col", we add the filters "s.col=1" and
- * "u.col=1". Note that this does not depend on the types of joins (ie.
- * inner, left/right/full outer) between the tables s, t and u because if
- * a predicate, eg. "t.col=1" is present in getFinalCandidates() at this
- * point, we have already verified that it can be pushed down, so any rows
- * emitted must satisfy s.col=t.col=u.col=1 and replicating the filters
- * like this is ok.
- */
- private void applyFilterTransitivity(JoinOperator nd, OpWalkerInfo owi)
- throws SemanticException {
- ExprWalkerInfo prunePreds =
- owi.getPrunedPreds(nd);
- if (prunePreds != null) {
- // We want to use the row resolvers of the parents of the join op
- // because the rowresolver refers to the output columns of an operator
- // and the filters at this point refer to the input columns of the join
- // operator.
- Map<String, RowResolver> aliasToRR =
- new HashMap<String, RowResolver>();
- for (Operator<? extends OperatorDesc> o : (nd).getParentOperators()) {
- for (String alias : owi.getRowResolver(o).getTableNames()){
- aliasToRR.put(alias, owi.getRowResolver(o));
- }
- }
-
- // eqExpressions is a list of ArrayList<ASTNode>'s, one for each table
- // in the join. Then for each i, j and k, the join condition is that
- // eqExpressions[i][k]=eqExpressions[j][k] (*) (ie. the columns referenced
- // by the corresponding ASTNodes are equal). For example, if the query
- // was SELECT * FROM a join b on a.col=b.col and a.col2=b.col2 left
- // outer join c on b.col=c.col and b.col2=c.col2 WHERE c.col=1,
- // eqExpressions would be [[a.col1, a.col2], [b.col1, b.col2],
- // [c.col1, c.col2]].
- //
- // numEqualities is the number of equal columns in each equality
- // "chain" and numColumns is the number of such chains.
- //
- // Note that (*) is guaranteed to be true for the
- // join operator: if the equijoin condititions can't be expressed in
- // these equal-length lists of equal columns (for example if we had the
- // query SELECT * FROM a join b on a.col=b.col and a.col2=b.col2 left
- // outer join c on b.col=c.col), more than one join operator is used.
- ArrayList<ArrayList<ASTNode>> eqExpressions =
- owi.getParseContext().getJoinContext().get(nd).getExpressions();
- int numColumns = eqExpressions.size();
- int numEqualities = eqExpressions.get(0).size();
-
- // oldFilters contains the filters to be pushed down
- Map<String, List<ExprNodeDesc>> oldFilters =
- prunePreds.getFinalCandidates();
- Map<String, List<ExprNodeDesc>> newFilters =
- new HashMap<String, List<ExprNodeDesc>>();
-
- // We loop through for each chain of equalities
- for (int i=0; i<numEqualities; i++) {
- // equalColumns[i] is the ColumnInfo corresponding to the ith term
- // of the equality or null if the term is not a simple column
- // reference
- ColumnInfo[] equalColumns=new ColumnInfo[numColumns];
- for (int j=0; j<numColumns; j++) {
- equalColumns[j] =
- getColumnInfoFromAST(eqExpressions.get(j).get(i), aliasToRR);
- }
- for (int j=0; j<numColumns; j++) {
- for (int k=0; k<numColumns; k++) {
- if (j != k && equalColumns[j]!= null
- && equalColumns[k] != null) {
- // terms j and k in the equality chain are simple columns,
- // so we can replace instances of column j with column k
- // in the filter and ad the replicated filter.
- ColumnInfo left = equalColumns[j];
- ColumnInfo right = equalColumns[k];
- if (oldFilters.get(left.getTabAlias()) != null){
- for (ExprNodeDesc expr :
- oldFilters.get(left.getTabAlias())) {
- // Only replicate the filter if there is exactly one column
- // referenced
- Set<String> colsreferenced =
- new HashSet<String>(expr.getCols());
- if (colsreferenced.size() == 1
- && colsreferenced.contains(left.getInternalName())){
- ExprNodeDesc newexpr = expr.clone();
- // Replace the column reference in the filter
- replaceColumnReference(newexpr, left.getInternalName(),
- right.getInternalName());
- if (newFilters.get(right.getTabAlias()) == null) {
- newFilters.put(right.getTabAlias(),
- new ArrayList<ExprNodeDesc>());
- }
- newFilters.get(right.getTabAlias()).add(newexpr);
- }
- }
- }
- }
- }
- }
- }
- // Push where false filter transitively
- Map<String,List<ExprNodeDesc>> candidates = prunePreds.getNonFinalCandidates();
- List<ExprNodeDesc> exprs;
- // where false is not associated with any alias in candidates
- if (null != candidates && candidates.get(null) != null && ((exprs = candidates.get(null)) != null)) {
- Iterator<ExprNodeDesc> itr = exprs.iterator();
- while (itr.hasNext()) {
- ExprNodeDesc expr = itr.next();
- if (expr instanceof ExprNodeConstantDesc && Boolean.FALSE.equals(((ExprNodeConstantDesc)expr).getValue())) {
- // push this 'where false' expr to all aliases
- for (String alias : aliasToRR.keySet()) {
- List<ExprNodeDesc> pushedFilters = newFilters.get(alias);
- if (null == pushedFilters) {
- newFilters.put(alias, new ArrayList<ExprNodeDesc>());
-
- }
- newFilters.get(alias).add(expr);
- }
- // this filter is pushed, we can remove it from non-final candidates.
- itr.remove();
- }
- }
- }
- for (Entry<String, List<ExprNodeDesc>> aliasToFilters
- : newFilters.entrySet()){
- owi.getPrunedPreds(nd)
- .addPushDowns(aliasToFilters.getKey(), aliasToFilters.getValue());
- }
- }
- }
-
- /**
- * Replaces the ColumnInfo for the column referred to by an ASTNode
- * representing "table.column" or null if the ASTNode is not in that form
- */
- private ColumnInfo getColumnInfoFromAST(ASTNode nd,
- Map<String, RowResolver> aliastoRR) throws SemanticException {
- // this bit is messy since we are parsing an ASTNode at this point
- if (nd.getType()==HiveParser.DOT) {
- if (nd.getChildCount()==2) {
- if (nd.getChild(0).getType()==HiveParser.TOK_TABLE_OR_COL
- && nd.getChild(0).getChildCount()==1
- && nd.getChild(1).getType()==HiveParser.Identifier){
- // We unescape the identifiers and make them lower case--this
- // really shouldn't be done here, but getExpressions gives us the
- // raw ASTNodes. The same thing is done in SemanticAnalyzer.
- // parseJoinCondPopulateAlias().
- String alias = BaseSemanticAnalyzer.unescapeIdentifier(
- nd.getChild(0).getChild(0).getText().toLowerCase());
- String column = BaseSemanticAnalyzer.unescapeIdentifier(
- nd.getChild(1).getText().toLowerCase());
- RowResolver rr=aliastoRR.get(alias);
- if (rr == null) {
- return null;
- }
- return rr.get(alias, column);
- }
- }
- }
- return null;
- }
-
- /**
- * Replaces all instances of oldColumn with newColumn in the
- * ExprColumnDesc's of the ExprNodeDesc
- */
- private void replaceColumnReference(ExprNodeDesc expr,
- String oldColumn, String newColumn) {
- if (expr instanceof ExprNodeColumnDesc) {
- if (((ExprNodeColumnDesc) expr).getColumn().equals(oldColumn)){
- ((ExprNodeColumnDesc) expr).setColumn(newColumn);
- }
- }
-
- if (expr.getChildren() != null){
- for (ExprNodeDesc childexpr : expr.getChildren()) {
- replaceColumnReference(childexpr, oldColumn, newColumn);
- }
- }
- }
-
/**
* Figures out the aliases for whom it is safe to push predicates based on
* ANSI SQL semantics. The join conditions are left associative so "a
@@ -760,6 +566,86 @@ public final class OpProcFactory {
}
}
+ public static class ReduceSinkPPD extends DefaultPPD implements NodeProcessor {
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ super.process(nd, stack, procCtx, nodeOutputs);
+ Operator<?> operator = (Operator<?>) nd;
+ OpWalkerInfo owi = (OpWalkerInfo) procCtx;
+ if (operator.getNumChild() == 1 &&
+ operator.getChildOperators().get(0) instanceof JoinOperator) {
+ if (HiveConf.getBoolVar(owi.getParseContext().getConf(),
+ HiveConf.ConfVars.HIVEPPDRECOGNIZETRANSITIVITY)) {
+ JoinOperator child = (JoinOperator) operator.getChildOperators().get(0);
+ int targetPos = child.getParentOperators().indexOf(operator);
+ applyFilterTransitivity(child, targetPos, owi);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Adds additional pushdown predicates for a join operator by replicating
+ * filters transitively over all the equijoin conditions.
+ *
+ * If we have a predicate "t.col=1" and the equijoin conditions
+ * "t.col=s.col" and "t.col=u.col", we add the filters "s.col=1" and
+ * "u.col=1". Note that this does not depend on the types of joins (ie.
+ * inner, left/right/full outer) between the tables s, t and u because if
+ * a predicate, eg. "t.col=1" is present in getFinalCandidates() at this
+ * point, we have already verified that it can be pushed down, so any rows
+ * emitted must satisfy s.col=t.col=u.col=1 and replicating the filters
+ * like this is ok.
+ */
+ private void applyFilterTransitivity(JoinOperator join, int targetPos, OpWalkerInfo owi)
+ throws SemanticException {
+
+ ExprWalkerInfo joinPreds = owi.getPrunedPreds(join);
+ if (joinPreds == null || !joinPreds.hasAnyCandidates()) {
+ return;
+ }
+ Map<String, List<ExprNodeDesc>> oldFilters = joinPreds.getFinalCandidates();
+ Map<String, List<ExprNodeDesc>> newFilters = new HashMap<String, List<ExprNodeDesc>>();
+
+ List<Operator<? extends OperatorDesc>> parentOperators = join.getParentOperators();
+
+ ReduceSinkOperator target = (ReduceSinkOperator) parentOperators.get(targetPos);
+ List<ExprNodeDesc> targetKeys = target.getConf().getKeyCols();
+
+ ExprWalkerInfo rsPreds = owi.getPrunedPreds(target);
+ for (int sourcePos = 0; sourcePos < parentOperators.size(); sourcePos++) {
+ ReduceSinkOperator source = (ReduceSinkOperator) parentOperators.get(sourcePos);
+ List<ExprNodeDesc> sourceKeys = source.getConf().getKeyCols();
+ Set<String> sourceAliases = new HashSet<String>(Arrays.asList(source.getInputAliases()));
+ for (Map.Entry<String, List<ExprNodeDesc>> entry : oldFilters.entrySet()) {
+ if (entry.getKey() == null && ExprNodeDescUtils.isAllConstants(entry.getValue())) {
+ // propagate constants
+ for (String targetAlias : target.getInputAliases()) {
+ rsPreds.addPushDowns(targetAlias, entry.getValue());
+ }
+ continue;
+ }
+ if (!sourceAliases.contains(entry.getKey())) {
+ continue;
+ }
+ for (ExprNodeDesc predicate : entry.getValue()) {
+ ExprNodeDesc backtrack = ExprNodeDescUtils.backtrack(predicate, join, source);
+ if (backtrack == null) {
+ continue;
+ }
+ ExprNodeDesc replaced = ExprNodeDescUtils.replace(backtrack, sourceKeys, targetKeys);
+ if (replaced == null) {
+ continue;
+ }
+ for (String targetAlias : target.getInputAliases()) {
+ rsPreds.addFinalCandidate(targetAlias, replaced);
+ }
+ }
+ }
+ }
+ }
+ }
+
/**
* Default processor which just merges its children.
*/
@@ -900,11 +786,10 @@ public final class OpProcFactory {
protected static Object createFilter(Operator op,
ExprWalkerInfo pushDownPreds, OpWalkerInfo owi) {
- if (pushDownPreds == null || pushDownPreds.getFinalCandidates() == null
- || pushDownPreds.getFinalCandidates().size() == 0) {
- return null;
+ if (pushDownPreds != null && pushDownPreds.hasAnyCandidates()) {
+ return createFilter(op, pushDownPreds.getFinalCandidates(), owi);
}
- return createFilter(op, pushDownPreds.getFinalCandidates(), owi);
+ return null;
}
protected static Object createFilter(Operator op,
@@ -1113,6 +998,10 @@ public final class OpProcFactory {
return new JoinerPPD();
}
+ public static NodeProcessor getRSProc() {
+ return new ReduceSinkPPD();
+ }
+
private OpProcFactory() {
// prevent instantiation
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicatePushDown.java Wed Nov 5 22:41:22 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.La
import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.ScriptOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.UDTFOperator;
@@ -118,6 +119,9 @@ public class PredicatePushDown implement
opRules.put(new RuleRegExp("R9",
LateralViewJoinOperator.getOperatorName() + "%"),
OpProcFactory.getLVJProc());
+ opRules.put(new RuleRegExp("R10",
+ ReduceSinkOperator.getOperatorName() + "%"),
+ OpProcFactory.getRSProc());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/PredicateTransitivePropagate.java Wed Nov 5 22:41:22 2014
@@ -30,7 +30,6 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
@@ -179,40 +178,6 @@ public class PredicateTransitivePropagat
return null;
}
- // calculate filter propagation directions for each alias
- // L<->R for innner/semi join, L->R for left outer join, R->L for right outer join
- private int[][] getTargets(CommonJoinOperator<JoinDesc> join) {
- JoinCondDesc[] conds = join.getConf().getConds();
-
- int aliases = conds.length + 1;
- Vectors vector = new Vectors(aliases);
- for (JoinCondDesc cond : conds) {
- int left = cond.getLeft();
- int right = cond.getRight();
- switch (cond.getType()) {
- case JoinDesc.INNER_JOIN:
- case JoinDesc.LEFT_SEMI_JOIN:
- vector.add(left, right);
- vector.add(right, left);
- break;
- case JoinDesc.LEFT_OUTER_JOIN:
- vector.add(left, right);
- break;
- case JoinDesc.RIGHT_OUTER_JOIN:
- vector.add(right, left);
- break;
- case JoinDesc.FULL_OUTER_JOIN:
- break;
- }
- }
- int[][] result = new int[aliases][];
- for (int pos = 0 ; pos < aliases; pos++) {
- // find all targets recursively
- result[pos] = vector.traverse(pos);
- }
- return result;
- }
-
// check same filter exists already
private boolean filterExists(ReduceSinkOperator target, ExprNodeDesc replaced) {
Operator<?> operator = target.getParentOperators().get(0);
@@ -226,6 +191,40 @@ public class PredicateTransitivePropagat
}
}
+ // calculate filter propagation directions for each alias
+ // L<->R for inner/semi join, L->R for left outer join, R->L for right outer join
+ public static int[][] getTargets(CommonJoinOperator<JoinDesc> join) {
+ JoinCondDesc[] conds = join.getConf().getConds();
+
+ int aliases = conds.length + 1;
+ Vectors vector = new Vectors(aliases);
+ for (JoinCondDesc cond : conds) {
+ int left = cond.getLeft();
+ int right = cond.getRight();
+ switch (cond.getType()) {
+ case JoinDesc.INNER_JOIN:
+ case JoinDesc.LEFT_SEMI_JOIN:
+ vector.add(left, right);
+ vector.add(right, left);
+ break;
+ case JoinDesc.LEFT_OUTER_JOIN:
+ vector.add(left, right);
+ break;
+ case JoinDesc.RIGHT_OUTER_JOIN:
+ vector.add(right, left);
+ break;
+ case JoinDesc.FULL_OUTER_JOIN:
+ break;
+ }
+ }
+ int[][] result = new int[aliases][];
+ for (int pos = 0 ; pos < aliases; pos++) {
+ // find all targets recursively
+ result[pos] = vector.traverse(pos);
+ }
+ return result;
+ }
+
private static class Vectors {
private Set<Integer>[] vector;
@@ -245,10 +244,11 @@ public class PredicateTransitivePropagat
public int[] traverse(int pos) {
Set<Integer> targets = new HashSet<Integer>();
traverse(targets, pos);
- return toArray(targets);
+ return toArray(targets, pos);
}
- private int[] toArray(Set<Integer> values) {
+ private int[] toArray(Set<Integer> values, int pos) {
+ values.remove(pos);
int index = 0;
int[] result = new int[values.size()];
for (int value : values) {
Added: hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q?rev=1636995&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/auto_join33.q Wed Nov 5 22:41:22 2014
@@ -0,0 +1,16 @@
+set hive.auto.convert.join = true;
+
+-- SORT_QUERY_RESULTS
+
+explain
+SELECT * FROM
+ (SELECT * FROM src WHERE key+1 < 10) a
+ JOIN
+ (SELECT * FROM src WHERE key+2 < 10) b
+ ON a.key+1=b.key+2;
+
+SELECT * FROM
+ (SELECT * FROM src WHERE key+1 < 10) a
+ JOIN
+ (SELECT * FROM src WHERE key+2 < 10) b
+ ON a.key+1=b.key+2;
Added: hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out?rev=1636995&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join33.q.out Wed Nov 5 22:41:22 2014
@@ -0,0 +1,113 @@
+PREHOOK: query: -- SORT_QUERY_RESULTS
+
+explain
+SELECT * FROM
+ (SELECT * FROM src WHERE key+1 < 10) a
+ JOIN
+ (SELECT * FROM src WHERE key+2 < 10) b
+ ON a.key+1=b.key+2
+PREHOOK: type: QUERY
+POSTHOOK: query: -- SORT_QUERY_RESULTS
+
+explain
+SELECT * FROM
+ (SELECT * FROM src WHERE key+1 < 10) a
+ JOIN
+ (SELECT * FROM src WHERE key+2 < 10) b
+ ON a.key+1=b.key+2
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+ Stage-4 is a root stage
+ Stage-3 depends on stages: Stage-4
+ Stage-0 depends on stages: Stage-3
+
+STAGE PLANS:
+ Stage: Stage-4
+ Map Reduce Local Work
+ Alias -> Map Local Tables:
+ a:src
+ Fetch Operator
+ limit: -1
+ Alias -> Map Local Operator Tree:
+ a:src
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (((key + 1) < 10) and (key + 1) is not null) (type: boolean)
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ HashTable Sink Operator
+ condition expressions:
+ 0 {_col0} {_col1}
+ 1 {_col0} {_col1}
+ keys:
+ 0 (_col0 + 1) (type: double)
+ 1 (_col0 + 2) (type: double)
+
+ Stage: Stage-3
+ Map Reduce
+ Map Operator Tree:
+ TableScan
+ alias: src
+ Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+ Filter Operator
+ predicate: (((key + 2) < 10) and (key + 2) is not null) (type: boolean)
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: key (type: string), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 83 Data size: 881 Basic stats: COMPLETE Column stats: NONE
+ Map Join Operator
+ condition map:
+ Inner Join 0 to 1
+ condition expressions:
+ 0 {_col0} {_col1}
+ 1 {_col0} {_col1}
+ keys:
+ 0 (_col0 + 1) (type: double)
+ 1 (_col0 + 2) (type: double)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
+ Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 91 Data size: 969 Basic stats: COMPLETE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ Local Work:
+ Map Reduce Local Work
+
+ Stage: Stage-0
+ Fetch Operator
+ limit: -1
+ Processor Tree:
+ ListSink
+
+PREHOOK: query: SELECT * FROM
+ (SELECT * FROM src WHERE key+1 < 10) a
+ JOIN
+ (SELECT * FROM src WHERE key+2 < 10) b
+ ON a.key+1=b.key+2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM
+ (SELECT * FROM src WHERE key+1 < 10) a
+ JOIN
+ (SELECT * FROM src WHERE key+2 < 10) b
+ ON a.key+1=b.key+2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+#### A masked pattern was here ####
+5 val_5 4 val_4
+5 val_5 4 val_4
+5 val_5 4 val_4
Modified: hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out?rev=1636995&r1=1636994&r2=1636995&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/vector_mapjoin_reduce.q.out Wed Nov 5 22:41:22 2014
@@ -373,17 +373,17 @@ STAGE PLANS:
alias: lineitem
Statistics: Num rows: 1099 Data size: 12099 Basic stats: COMPLETE Column stats: NONE
Filter Operator
- predicate: (((l_shipmode = 'AIR') and l_orderkey is not null) and l_linenumber is not null) (type: boolean)
- Statistics: Num rows: 138 Data size: 1519 Basic stats: COMPLETE Column stats: NONE
+ predicate: ((((l_shipmode = 'AIR') and l_orderkey is not null) and l_linenumber is not null) and (l_linenumber = 1)) (type: boolean)
+ Statistics: Num rows: 69 Data size: 759 Basic stats: COMPLETE Column stats: NONE
Select Operator
- expressions: l_orderkey (type: int), l_linenumber (type: int)
+ expressions: l_orderkey (type: int), 1 (type: int)
outputColumnNames: _col0, _col1
- Statistics: Num rows: 138 Data size: 1519 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 69 Data size: 759 Basic stats: COMPLETE Column stats: NONE
Group By Operator
keys: _col0 (type: int), _col1 (type: int)
mode: hash
outputColumnNames: _col0, _col1
- Statistics: Num rows: 138 Data size: 1519 Basic stats: COMPLETE Column stats: NONE
+ Statistics: Num rows: 69 Data size: 759 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
condition expressions:
0 {_col0} {_col3}