You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/10/14 21:07:05 UTC
svn commit: r1631841 [12/42] - in /hive/branches/llap: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/
accumulo-handler/src/java/org/apache/hadoop/hiv...
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Tue Oct 14 19:06:45 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.optimizer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -29,12 +30,17 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MuxOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -42,12 +48,16 @@ import org.apache.hadoop.hive.ql.parse.O
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.OpTraits;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* ConvertJoinMapJoin is an optimization that replaces a common join
@@ -60,39 +70,46 @@ public class ConvertJoinMapJoin implemen
static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
+ @SuppressWarnings("unchecked")
@Override
- /*
- * (non-Javadoc)
- * we should ideally not modify the tree we traverse.
- * However, since we need to walk the tree at any time when we modify the
- * operator, we might as well do it here.
- */
- public Object process(Node nd, Stack<Node> stack,
- NodeProcessorCtx procCtx, Object... nodeOutputs)
- throws SemanticException {
+ /*
+ * (non-Javadoc) we should ideally not modify the tree we traverse. However,
+ * since we need to walk the tree at any time when we modify the operator, we
+ * might as well do it here.
+ */
+ public Object
+ process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+ throws SemanticException {
OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
- if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
+ JoinOperator joinOp = (JoinOperator) nd;
+
+ if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)
+ && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
return null;
}
- JoinOperator joinOp = (JoinOperator) nd;
- // if we have traits, and table info is present in the traits, we know the
+ // if we have traits, and table info is present in the traits, we know the
// exact number of buckets. Else choose the largest number of estimated
// reducers from the parent operators.
int numBuckets = -1;
int estimatedBuckets = -1;
+ TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
if (parentOp.getOpTraits().getNumBuckets() > 0) {
- numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
- parentOp.getOpTraits().getNumBuckets() : numBuckets;
+ numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
+ parentOp.getOpTraits().getNumBuckets() : numBuckets;
}
if (parentOp instanceof ReduceSinkOperator) {
ReduceSinkOperator rs = (ReduceSinkOperator)parentOp;
- estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
+ estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
rs.getConf().getNumReducers() : estimatedBuckets;
}
}
@@ -107,29 +124,80 @@ public class ConvertJoinMapJoin implemen
numBuckets = 1;
}
LOG.info("Estimated number of buckets " + numBuckets);
- int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets);
+ int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
if (mapJoinConversionPos < 0) {
- // we cannot convert to bucket map join, we cannot convert to
- // map join either based on the size
+ // we cannot convert to bucket map join, we cannot convert to
+ // map join either based on the size. Check if we can convert to SMB join.
+ if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
+ convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
+ return null;
+ }
+ Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
+ try {
+ bigTableMatcherClass =
+ (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
+ context.parseContext.getConf(),
+ HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
+ } catch (ClassNotFoundException e) {
+ throw new SemanticException(e.getMessage());
+ }
+
+ BigTableSelectorForAutoSMJ bigTableMatcher =
+ ReflectionUtils.newInstance(bigTableMatcherClass, null);
+ JoinDesc joinDesc = joinOp.getConf();
+ JoinCondDesc[] joinCondns = joinDesc.getConds();
+ Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
+ if (joinCandidates.isEmpty()) {
+ // This is a full outer join. This can never be a map-join
+ // of any type. So return false.
+ return false;
+ }
+ mapJoinConversionPos =
+ bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
+ if (mapJoinConversionPos < 0) {
+ // contains aliases from sub-query
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+ return null;
+ }
+
+ if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+ convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
+ tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
+ } else {
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+ }
return null;
}
- if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
- if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) {
- return null;
+ if (numBuckets > 1) {
+ if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
+ if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+ return null;
+ }
}
}
LOG.info("Convert to non-bucketed map join");
// check if we can convert to map join no bucket scaling.
- mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1);
+ mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1);
if (mapJoinConversionPos < 0) {
+ // we are just converting to a common merge join operator. The shuffle
+ // join in map-reduce case.
+ int pos = 0; // it doesn't matter which position we use in this case.
+ convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
return null;
}
MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
// map join operator by default has no bucket cols
- mapJoinOp.setOpTraits(new OpTraits(null, -1));
+ mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
+ mapJoinOp.setStatistics(joinOp.getStatistics());
// propagate this change till the next RS
for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
setAllChildrenTraitsToNull(childOp);
@@ -138,11 +206,107 @@ public class ConvertJoinMapJoin implemen
return null;
}
+ // replaces the join operator with a new CommonJoinOperator, removes the
+ // parent reduce sinks
+ private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
+ throws SemanticException {
+ ParseContext parseContext = context.parseContext;
+ MapJoinDesc mapJoinDesc = null;
+ if (adjustParentsChildren) {
+ mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
+ joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
+ } else {
+ JoinDesc joinDesc = joinOp.getConf();
+ // retain the original join desc in the map join.
+ mapJoinDesc =
+ new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
+ joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
+ joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
+ }
+
+ @SuppressWarnings("unchecked")
+ CommonMergeJoinOperator mergeJoinOp =
+ (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
+ isSubQuery, mapJoinConversionPos, mapJoinDesc));
+ OpTraits opTraits =
+ new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
+ .getSortCols());
+ mergeJoinOp.setOpTraits(opTraits);
+ mergeJoinOp.setStatistics(joinOp.getStatistics());
+
+ for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+ int pos = parentOp.getChildOperators().indexOf(joinOp);
+ parentOp.getChildOperators().remove(pos);
+ parentOp.getChildOperators().add(pos, mergeJoinOp);
+ }
+
+ for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) {
+ int pos = childOp.getParentOperators().indexOf(joinOp);
+ childOp.getParentOperators().remove(pos);
+ childOp.getParentOperators().add(pos, mergeJoinOp);
+ }
+
+ List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators();
+ if (childOperators == null) {
+ childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+ mergeJoinOp.setChildOperators(childOperators);
+ }
+
+ List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators();
+ if (parentOperators == null) {
+ parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+ mergeJoinOp.setParentOperators(parentOperators);
+ }
+
+ childOperators.clear();
+ parentOperators.clear();
+ childOperators.addAll(joinOp.getChildOperators());
+ parentOperators.addAll(joinOp.getParentOperators());
+ mergeJoinOp.getConf().setGenJoinKeys(false);
+
+ if (adjustParentsChildren) {
+ mergeJoinOp.getConf().setGenJoinKeys(true);
+ List<Operator<? extends OperatorDesc>> newParentOpList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) {
+ for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
+ grandParentOp.getChildOperators().remove(parentOp);
+ grandParentOp.getChildOperators().add(mergeJoinOp);
+ newParentOpList.add(grandParentOp);
+ }
+ }
+ mergeJoinOp.getParentOperators().clear();
+ mergeJoinOp.getParentOperators().addAll(newParentOpList);
+ List<Operator<? extends OperatorDesc>> parentOps =
+ new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators());
+ for (Operator<? extends OperatorDesc> parentOp : parentOps) {
+ int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp);
+ if (parentIndex == mapJoinConversionPos) {
+ continue;
+ }
+
+ // insert the dummy store operator here
+ DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator();
+ dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+ dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+ dummyStoreOp.getChildOperators().add(mergeJoinOp);
+ int index = parentOp.getChildOperators().indexOf(mergeJoinOp);
+ parentOp.getChildOperators().remove(index);
+ parentOp.getChildOperators().add(index, dummyStoreOp);
+ dummyStoreOp.getParentOperators().add(parentOp);
+ mergeJoinOp.getParentOperators().remove(parentIndex);
+ mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp);
+ }
+ }
+ mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
+ }
+
private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
if (currentOp instanceof ReduceSinkOperator) {
return;
}
- currentOp.setOpTraits(new OpTraits(null, -1));
+ currentOp.setOpTraits(new OpTraits(null, -1, null));
for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
break;
@@ -151,28 +315,26 @@ public class ConvertJoinMapJoin implemen
}
}
- private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
- int bigTablePosition) throws SemanticException {
-
- TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
+ private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) {
LOG.info("Check conversion to bucket map join failed.");
return false;
}
- MapJoinOperator mapJoinOp =
- convertJoinMapJoin(joinOp, context, bigTablePosition);
+ MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition);
MapJoinDesc joinDesc = mapJoinOp.getConf();
joinDesc.setBucketMapJoin(true);
// we can set the traits for this join operator
OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
- tezBucketJoinProcCtx.getNumBuckets());
+ tezBucketJoinProcCtx.getNumBuckets(), null);
mapJoinOp.setOpTraits(opTraits);
+ mapJoinOp.setStatistics(joinOp.getStatistics());
setNumberOfBucketsOnChildren(mapJoinOp);
- // Once the conversion is done, we can set the partitioner to bucket cols on the small table
+ // Once the conversion is done, we can set the partitioner to bucket cols on the small table
Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>();
bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets());
joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping);
@@ -182,6 +344,54 @@ public class ConvertJoinMapJoin implemen
return true;
}
+ /*
+ * This method tries to convert a join to an SMB. This is done based on
+ * traits. If the sorted by columns are the same as the join columns then, we
+ * can convert the join to an SMB. Otherwise retain the bucket map join as it
+ * is still more efficient than a regular join.
+ */
+ private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+
+ ReduceSinkOperator bigTableRS =
+ (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+ int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits()
+ .getNumBuckets();
+
+ // the sort and bucket cols have to match on both sides for this
+ // transformation of the join operation
+ for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+ if (!(parentOp instanceof ReduceSinkOperator)) {
+ // could be mux/demux operators. Currently not supported
+ LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time.");
+ return false;
+ }
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp;
+ if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp
+ .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) {
+ LOG.info("We cannot convert to SMB because the sort column names do not match.");
+ return false;
+ }
+
+ if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp
+ .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx)
+ == false) {
+ LOG.info("We cannot convert to SMB because bucket column names do not match.");
+ return false;
+ }
+ }
+
+ boolean isSubQuery = false;
+ if (numBuckets < 0) {
+ isSubQuery = true;
+ numBuckets = bigTableRS.getConf().getNumReducers();
+ }
+ tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+ tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+ LOG.info("We can convert the join to an SMB join.");
+ return true;
+ }
+
private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
int numBuckets = currentOp.getOpTraits().getNumBuckets();
for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
@@ -193,15 +403,13 @@ public class ConvertJoinMapJoin implemen
}
/*
- * We perform the following checks to see if we can convert to a bucket map join
- * 1. If the parent reduce sink of the big table side has the same emit key cols as
- * its parent, we can create a bucket map join eliminating the reduce sink.
- * 2. If we have the table information, we can check the same way as in Mapreduce to
- * determine if we can perform a Bucket Map Join.
+ * If the parent reduce sink of the big table side has the same emit key cols
+ * as its parent, we can create a bucket map join eliminating the reduce sink.
*/
- private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
- OptimizeTezProcContext context, int bigTablePosition,
- TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+ private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
+ OptimizeTezProcContext context, int bigTablePosition,
+ TezBucketJoinProcCtx tezBucketJoinProcCtx)
+ throws SemanticException {
// bail on mux-operator because mux operator masks the emit keys of the
// constituent reduce sinks
if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) {
@@ -211,14 +419,41 @@ public class ConvertJoinMapJoin implemen
}
ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+ List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames();
+ Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0);
+ List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames();
+ int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
+ // all keys matched.
+ if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(),
+ tezBucketJoinProcCtx) == false) {
+ LOG.info("No info available to check for bucket map join. Cannot convert");
+ return false;
+ }
+
/*
* this is the case when the big table is a sub-query and is probably
- * already bucketed by the join column in say a group by operation
+ * already bucketed by the join column in say a group by operation
*/
- List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames();
- if ((colNames != null) && (colNames.isEmpty() == false)) {
- Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0);
- for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) {
+ boolean isSubQuery = false;
+ if (numBuckets < 0) {
+ isSubQuery = true;
+ numBuckets = rs.getConf().getNumReducers();
+ }
+ tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+ tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+ return true;
+ }
+
+ private boolean checkColEquality(List<List<String>> grandParentColNames,
+ List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap,
+ TezBucketJoinProcCtx tezBucketJoinProcCtx) {
+
+ if ((grandParentColNames == null) || (parentColNames == null)) {
+ return false;
+ }
+
+ if ((parentColNames != null) && (parentColNames.isEmpty() == false)) {
+ for (List<String> listBucketCols : grandParentColNames) {
// can happen if this operator does not carry forward the previous bucketing columns
// for e.g. another join operator which does not carry one of the sides' key columns
if (listBucketCols.isEmpty()) {
@@ -226,9 +461,9 @@ public class ConvertJoinMapJoin implemen
}
int colCount = 0;
// parent op is guaranteed to have a single list because it is a reduce sink
- for (String colName : rs.getOpTraits().getBucketColNames().get(0)) {
+ for (String colName : parentColNames.get(0)) {
// all columns need to be at least a subset of the parentOfParent's bucket cols
- ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName);
+ ExprNodeDesc exprNodeDesc = colExprMap.get(colName);
if (exprNodeDesc instanceof ExprNodeColumnDesc) {
if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) {
colCount++;
@@ -236,32 +471,21 @@ public class ConvertJoinMapJoin implemen
break;
}
}
-
- if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) {
- // all keys matched.
- int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
- boolean isSubQuery = false;
- if (numBuckets < 0) {
- isSubQuery = true;
- numBuckets = rs.getConf().getNumReducers();
- }
- tezBucketJoinProcCtx.setNumBuckets(numBuckets);
- tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+
+ if (colCount == parentColNames.get(0).size()) {
return true;
}
}
}
return false;
}
-
- LOG.info("No info available to check for bucket map join. Cannot convert");
return false;
}
- public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
+ public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
int buckets) {
- Set<Integer> bigTableCandidateSet = MapJoinProcessor.
- getBigTableCandidates(joinOp.getConf().getConds());
+ Set<Integer> bigTableCandidateSet =
+ MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
long maxSize = context.conf.getLongVar(
HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
@@ -287,7 +511,7 @@ public class ConvertJoinMapJoin implemen
long inputSize = currInputStat.getDataSize();
if ((bigInputStat == null) ||
((bigInputStat != null) &&
- (inputSize > bigInputStat.getDataSize()))) {
+ (inputSize > bigInputStat.getDataSize()))) {
if (bigTableFound) {
// cannot convert to map join; we've already chosen a big table
@@ -347,9 +571,9 @@ public class ConvertJoinMapJoin implemen
* for tez.
*/
- public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+ public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
int bigTablePosition) throws SemanticException {
- // bail on mux operator because currently the mux operator masks the emit keys
+ // bail on mux operator because currently the mux operator masks the emit keys
// of the constituent reduce sinks.
for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
if (parentOp instanceof MuxOperator) {
@@ -359,12 +583,12 @@ public class ConvertJoinMapJoin implemen
//can safely convert the join to a map join.
ParseContext parseContext = context.parseContext;
- MapJoinOperator mapJoinOp = MapJoinProcessor.
- convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
- joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+ MapJoinOperator mapJoinOp =
+ MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
+ parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
- Operator<? extends OperatorDesc> parentBigTableOp
- = mapJoinOp.getParentOperators().get(bigTablePosition);
+ Operator<? extends OperatorDesc> parentBigTableOp =
+ mapJoinOp.getParentOperators().get(bigTablePosition);
if (parentBigTableOp instanceof ReduceSinkOperator) {
for (Operator<?> p : parentBigTableOp.getParentOperators()) {
// we might have generated a dynamic partition operator chain. Since
@@ -380,11 +604,10 @@ public class ConvertJoinMapJoin implemen
}
}
mapJoinOp.getParentOperators().remove(bigTablePosition);
- if (!(mapJoinOp.getParentOperators().contains(
- parentBigTableOp.getParentOperators().get(0)))) {
+ if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
mapJoinOp.getParentOperators().add(bigTablePosition,
parentBigTableOp.getParentOperators().get(0));
- }
+ }
parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {
if (!(op.getChildOperators().contains(mapJoinOp))) {
@@ -397,15 +620,31 @@ public class ConvertJoinMapJoin implemen
return mapJoinOp;
}
- private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
- if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
- return true;
- }
- for (Operator<?> c : op.getChildOperators()) {
- if (hasDynamicPartitionBroadcast(c)) {
- return true;
+ private boolean hasDynamicPartitionBroadcast(Operator<?> parent) {
+ boolean hasDynamicPartitionPruning = false;
+
+ for (Operator<?> op: parent.getChildOperators()) {
+ while (op != null) {
+ if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+ // found dynamic partition pruning operator
+ hasDynamicPartitionPruning = true;
+ break;
+ }
+
+ if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
+ // crossing reduce sink or file sink means the pruning isn't for this parent.
+ break;
+ }
+
+ if (op.getChildOperators().size() != 1) {
+ // dynamic partition pruning pipeline doesn't have multiple children
+ break;
+ }
+
+ op = op.getChildOperators().get(0);
}
}
- return false;
+
+ return hasDynamicPartitionPruning;
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Tue Oct 14 19:06:45 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.optimizer;
+import com.google.common.collect.Interner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -39,8 +40,6 @@ import org.apache.hadoop.hive.ql.exec.No
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator;
-import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -99,7 +98,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import java.io.Serializable;
@@ -578,8 +576,6 @@ public final class GenMapRedUtils {
//This read entity is a direct read entity and not an indirect read (that is when
// this is being read because it is a dependency of a view).
boolean isDirectRead = (parentViewInfo == null);
- PlanUtils.addInput(inputs,
- new ReadEntity(parseCtx.getTopToTable().get(topOp), parentViewInfo, isDirectRead));
for (Partition part : parts) {
if (part.getTable().isPartitioned()) {
@@ -873,6 +869,30 @@ public final class GenMapRedUtils {
}
}
+ public static void internTableDesc(Task<?> task, Interner<TableDesc> interner) {
+
+ if (task instanceof ConditionalTask) {
+ for (Task tsk : ((ConditionalTask) task).getListTasks()) {
+ internTableDesc(tsk, interner);
+ }
+ } else if (task instanceof ExecDriver) {
+ MapredWork work = (MapredWork) task.getWork();
+ work.getMapWork().internTable(interner);
+ } else if (task != null && (task.getWork() instanceof TezWork)) {
+ TezWork work = (TezWork)task.getWork();
+ for (BaseWork w : work.getAllWorkUnsorted()) {
+ if (w instanceof MapWork) {
+ ((MapWork)w).internTable(interner);
+ }
+ }
+ }
+ if (task.getNumChild() > 0) {
+ for (Task childTask : task.getChildTasks()) {
+ internTableDesc(childTask, interner);
+ }
+ }
+ }
+
/**
* create a new plan and return.
*
@@ -1485,7 +1505,7 @@ public final class GenMapRedUtils {
*
* @param fsInputDesc
* @param finalName
- * @param inputFormatClass
+ * @param inputFormatClass
* @return MergeWork if table is stored as RCFile or ORCFile,
* null otherwise
*/
@@ -1689,7 +1709,7 @@ public final class GenMapRedUtils {
// There are separate configuration parameters to control whether to
// merge for a map-only job
// or for a map-reduce job
- if (currTask.getWork() instanceof MapredWork) {
+ if (currTask.getWork() instanceof MapredWork) {
ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork();
boolean mergeMapOnly =
hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
@@ -1788,7 +1808,7 @@ public final class GenMapRedUtils {
return Collections.emptyList();
}
- public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
+ public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
throws SemanticException {
List<Path> inputPaths = new ArrayList<Path>();
switch (parseInfo.getTableSpec().specType) {
@@ -1825,6 +1845,7 @@ public final class GenMapRedUtils {
public static Set<Operator<?>> findTopOps(Operator<?> startOp, final Class<?> clazz) {
final Set<Operator<?>> operators = new LinkedHashSet<Operator<?>>();
OperatorUtils.iterateParents(startOp, new NodeUtils.Function<Operator<?>>() {
+ @Override
public void apply(Operator<?> argument) {
if (argument.getNumParent() == 0 && (clazz == null || clazz.isInstance(argument))) {
operators.add(argument);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Tue Oct 14 19:06:45 2014
@@ -332,18 +332,26 @@ public class GroupByOptimizer implements
continue;
}
- ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
- if (selectColList instanceof ExprNodeColumnDesc) {
+ ExprNodeDesc selectCol = selectDesc.getColList().get(pos);
+ if (selectCol instanceof ExprNodeColumnDesc) {
String newValue =
- tableColsMapping.get(((ExprNodeColumnDesc) selectColList).getColumn());
+ tableColsMapping.get(((ExprNodeColumnDesc) selectCol).getColumn());
tableColsMapping.put(outputColumnName, newValue);
}
else {
tableColsMapping.remove(outputColumnName);
- if ((selectColList instanceof ExprNodeConstantDesc) ||
- (selectColList instanceof ExprNodeNullDesc)) {
+ if (selectCol instanceof ExprNodeNullDesc) {
newConstantCols.add(outputColumnName);
}
+ if (selectCol instanceof ExprNodeConstantDesc) {
+ // Lets see if this constant was folded because of optimization.
+ String origCol = ((ExprNodeConstantDesc) selectCol).getFoldedFromCol();
+ if (origCol != null) {
+ tableColsMapping.put(outputColumnName, origCol);
+ } else {
+ newConstantCols.add(outputColumnName);
+ }
+ }
}
}
@@ -351,7 +359,6 @@ public class GroupByOptimizer implements
}
}
- boolean sortGroupBy = true;
// compute groupby columns from groupby keys
List<String> groupByCols = new ArrayList<String>();
// If the group by expression is anything other than a list of columns,
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Tue Oct 14 19:06:45 2014
@@ -389,157 +389,8 @@ public class MapJoinProcessor implements
JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
throws SemanticException {
- JoinDesc desc = op.getConf();
- JoinCondDesc[] condns = desc.getConds();
- Byte[] tagOrder = desc.getTagOrder();
-
- // outer join cannot be performed on a table which is being cached
- if (!noCheckOuterJoin) {
- if (checkMapJoin(mapJoinPos, condns) < 0) {
- throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
- }
- }
-
- // Walk over all the sources (which are guaranteed to be reduce sink
- // operators).
- // The join outputs a concatenation of all the inputs.
- QBJoinTree leftSrc = joinTree.getJoinSrc();
- List<ReduceSinkOperator> oldReduceSinkParentOps =
- new ArrayList<ReduceSinkOperator>(op.getNumParent());
- if (leftSrc != null) {
- // assert mapJoinPos == 0;
- Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
- assert parentOp.getParentOperators().size() == 1;
- oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
- }
-
-
- byte pos = 0;
- for (String src : joinTree.getBaseSrc()) {
- if (src != null) {
- Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
- assert parentOp.getParentOperators().size() == 1;
- oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
- }
- pos++;
- }
-
- Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
- List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
- Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
- Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
- for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
- byte tag = entry.getKey();
- Operator<?> terminal = oldReduceSinkParentOps.get(tag);
-
- List<ExprNodeDesc> values = entry.getValue();
- List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
- newValueExprs.put(tag, newValues);
- for (int i = 0; i < schema.size(); i++) {
- ColumnInfo column = schema.get(i);
- if (column == null) {
- continue;
- }
- ExprNodeDesc expr = colExprMap.get(column.getInternalName());
- int index = ExprNodeDescUtils.indexOf(expr, values);
- if (index >= 0) {
- colExprMap.put(column.getInternalName(), newValues.get(index));
- schema.set(i, null);
- }
- }
- }
-
- // rewrite value index for mapjoin
- Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
-
- // get the join keys from old parent ReduceSink operators
- Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-
- // construct valueTableDescs and valueFilteredTableDescs
- List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
- List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
- int[][] filterMap = desc.getFilterMap();
- for (pos = 0; pos < op.getParentOperators().size(); pos++) {
- ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
- List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
- List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
- if (pos != mapJoinPos) {
- // remove values in key exprs for value table schema
- // value expression for hashsink will be modified in LocalMapJoinProcessor
- int[] valueIndex = new int[valueCols.size()];
- List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
- for (int i = 0; i < valueIndex.length; i++) {
- ExprNodeDesc expr = valueCols.get(i);
- int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
- if (kindex >= 0) {
- valueIndex[i] = kindex;
- } else {
- valueIndex[i] = -valueColsInValueExpr.size() - 1;
- valueColsInValueExpr.add(expr);
- }
- }
- if (needValueIndex(valueIndex)) {
- valueIndices.put(pos, valueIndex);
- }
- valueCols = valueColsInValueExpr;
- }
- // deep copy expr node desc
- List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
- if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
- ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
- .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false);
- valueFilteredCols.add(isFilterDesc);
- }
-
- TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
- TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
-
- valueTableDescs.add(valueTableDesc);
- valueFilteredTableDescs.add(valueFilteredTableDesc);
-
- keyExprMap.put(pos, keyCols);
- }
-
- Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
- Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
- for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
- byte srcTag = entry.getKey();
- List<ExprNodeDesc> filter = entry.getValue();
-
- Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
- newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
- }
- desc.setFilters(filters = newFilters);
-
- // create dumpfile prefix needed to create descriptor
- String dumpFilePrefix = "";
- if( joinTree.getMapAliases() != null ) {
- for(String mapAlias : joinTree.getMapAliases()) {
- dumpFilePrefix = dumpFilePrefix + mapAlias;
- }
- dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix();
- } else {
- dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
- }
-
- List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos);
-
- List<String> outputColumnNames = op.getConf().getOutputColumnNames();
- TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
- PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
- JoinCondDesc[] joinCondns = op.getConf().getConds();
- MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
- valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns,
- filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
- mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
- mapJoinDescriptor.setTagOrder(tagOrder);
- mapJoinDescriptor.setNullSafes(desc.getNullSafes());
- mapJoinDescriptor.setFilterMap(desc.getFilterMap());
- if (!valueIndices.isEmpty()) {
- mapJoinDescriptor.setValueIndices(valueIndices);
- }
+ MapJoinDesc mapJoinDescriptor =
+ getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin);
// reduce sink row resolver used to generate map join op
RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -551,6 +402,7 @@ public class MapJoinProcessor implements
opParseCtxMap.put(mapJoinOp, ctx);
mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
+ Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
mapJoinOp.setColumnExprMap(colExprMap);
List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
@@ -1176,4 +1028,168 @@ public class MapJoinProcessor implements
}
}
+
+ public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
+ LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+ JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+ JoinDesc desc = op.getConf();
+ JoinCondDesc[] condns = desc.getConds();
+ Byte[] tagOrder = desc.getTagOrder();
+
+ // outer join cannot be performed on a table which is being cached
+ if (!noCheckOuterJoin) {
+ if (checkMapJoin(mapJoinPos, condns) < 0) {
+ throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+ }
+ }
+
+ // Walk over all the sources (which are guaranteed to be reduce sink
+ // operators).
+ // The join outputs a concatenation of all the inputs.
+ QBJoinTree leftSrc = joinTree.getJoinSrc();
+ List<ReduceSinkOperator> oldReduceSinkParentOps =
+ new ArrayList<ReduceSinkOperator>(op.getNumParent());
+ if (leftSrc != null) {
+ // assert mapJoinPos == 0;
+ Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+ assert parentOp.getParentOperators().size() == 1;
+ oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+ }
+
+ byte pos = 0;
+ for (String src : joinTree.getBaseSrc()) {
+ if (src != null) {
+ Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+ assert parentOp.getParentOperators().size() == 1;
+ oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+ }
+ pos++;
+ }
+
+ Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
+ List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
+ Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
+ Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
+ for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
+ byte tag = entry.getKey();
+ Operator<?> terminal = oldReduceSinkParentOps.get(tag);
+
+ List<ExprNodeDesc> values = entry.getValue();
+ List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
+ newValueExprs.put(tag, newValues);
+ for (int i = 0; i < schema.size(); i++) {
+ ColumnInfo column = schema.get(i);
+ if (column == null) {
+ continue;
+ }
+ ExprNodeDesc expr = colExprMap.get(column.getInternalName());
+ int index = ExprNodeDescUtils.indexOf(expr, values);
+ if (index >= 0) {
+ colExprMap.put(column.getInternalName(), newValues.get(index));
+ schema.set(i, null);
+ }
+ }
+ }
+
+ // rewrite value index for mapjoin
+ Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
+
+ // get the join keys from old parent ReduceSink operators
+ Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+ // construct valueTableDescs and valueFilteredTableDescs
+ List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
+ List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
+ int[][] filterMap = desc.getFilterMap();
+ for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+ ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
+ List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
+ List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+ if (pos != mapJoinPos) {
+ // remove values in key exprs for value table schema
+ // value expression for hashsink will be modified in
+ // LocalMapJoinProcessor
+ int[] valueIndex = new int[valueCols.size()];
+ List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
+ for (int i = 0; i < valueIndex.length; i++) {
+ ExprNodeDesc expr = valueCols.get(i);
+ int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
+ if (kindex >= 0) {
+ valueIndex[i] = kindex;
+ } else {
+ valueIndex[i] = -valueColsInValueExpr.size() - 1;
+ valueColsInValueExpr.add(expr);
+ }
+ }
+ if (needValueIndex(valueIndex)) {
+ valueIndices.put(pos, valueIndex);
+ }
+ valueCols = valueColsInValueExpr;
+ }
+ // deep copy expr node desc
+ List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
+ if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
+ ExprNodeColumnDesc isFilterDesc =
+ new ExprNodeColumnDesc(
+ TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter",
+ "filter", false);
+ valueFilteredCols.add(isFilterDesc);
+ }
+
+ TableDesc valueTableDesc =
+ PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols,
+ "mapjoinvalue"));
+ TableDesc valueFilteredTableDesc =
+ PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+ valueFilteredCols, "mapjoinvalue"));
+
+ valueTableDescs.add(valueTableDesc);
+ valueFilteredTableDescs.add(valueFilteredTableDesc);
+
+ keyExprMap.put(pos, keyCols);
+ }
+
+ Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+ Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+ for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+ byte srcTag = entry.getKey();
+ List<ExprNodeDesc> filter = entry.getValue();
+
+ Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+ newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+ }
+ desc.setFilters(filters = newFilters);
+
+ // create dumpfile prefix needed to create descriptor
+ String dumpFilePrefix = "";
+ if (joinTree.getMapAliases() != null) {
+ for (String mapAlias : joinTree.getMapAliases()) {
+ dumpFilePrefix = dumpFilePrefix + mapAlias;
+ }
+ dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+ } else {
+ dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+ }
+
+ List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos);
+
+ List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+ TableDesc keyTableDesc =
+ PlanUtils.getMapJoinKeyTableDesc(hconf,
+ PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+ JoinCondDesc[] joinCondns = op.getConf().getConds();
+ MapJoinDesc mapJoinDescriptor =
+ new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
+ valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op
+ .getConf().getNoOuterJoin(), dumpFilePrefix);
+ mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
+ mapJoinDescriptor.setTagOrder(tagOrder);
+ mapJoinDescriptor.setNullSafes(desc.getNullSafes());
+ mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+ if (!valueIndices.isEmpty()) {
+ mapJoinDescriptor.setValueIndices(valueIndices);
+ }
+
+ return mapJoinDescriptor;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Tue Oct 14 19:06:45 2014
@@ -51,7 +51,12 @@ public class Optimizer {
* @param hiveConf
*/
public void initialize(HiveConf hiveConf) {
+
+ boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
+ boolean bucketMapJoinOptimizer = false;
+
transformations = new ArrayList<Transform>();
+
// Add the transformation that computes the lineage information.
transformations.add(new Generator());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
@@ -59,19 +64,23 @@ public class Optimizer {
transformations.add(new SyntheticJoinPredicate());
transformations.add(new PredicatePushDown());
transformations.add(new PartitionPruner());
+ }
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
+ transformations.add(new ConstantPropagate());
+ }
+
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
transformations.add(new PartitionConditionRemover());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) {
/* Add list bucketing pruner. */
transformations.add(new ListBucketingPruner());
}
}
+
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGROUPBY) ||
HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT)) {
transformations.add(new GroupByOptimizer());
}
- if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCONSTANTPROPAGATION)) {
- transformations.add(new ConstantPropagate());
- }
transformations.add(new ColumnPruner());
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
transformations.add(new SkewJoinOptimizer());
@@ -81,15 +90,16 @@ public class Optimizer {
}
transformations.add(new SamplePruner());
transformations.add(new MapJoinProcessor());
- boolean bucketMapJoinOptimizer = false;
- if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
+
+ if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) {
transformations.add(new BucketMapJoinOptimizer());
bucketMapJoinOptimizer = true;
}
// If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
// BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
- if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
+ if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN))
+ && !isTezExecEngine) {
if (!bucketMapJoinOptimizer) {
// No need to add BucketMapJoinOptimizer twice
transformations.add(new BucketMapJoinOptimizer());
@@ -119,7 +129,7 @@ public class Optimizer {
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) &&
- !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ !isTezExecEngine) {
transformations.add(new CorrelationOptimizer());
}
if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
@@ -128,8 +138,7 @@ public class Optimizer {
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
transformations.add(new StatsOptimizer());
}
- if (pctx.getContext().getExplain()
- && !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ if (pctx.getContext().getExplain() && !isTezExecEngine) {
transformations.add(new AnnotateWithStatistics());
transformations.add(new AnnotateWithOpTraits());
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Tue Oct 14 19:06:45 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
public class ReduceSinkMapJoinProc implements NodeProcessor {
@@ -183,7 +184,10 @@ public class ReduceSinkMapJoinProc imple
TezWork tezWork = context.currentTask.getWork();
LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
tezWork.connect(parentWork, myWork, edgeProp);
-
+ if (edgeType == EdgeType.CUSTOM_EDGE) {
+ tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES);
+ }
+
ReduceSinkOperator r = null;
if (parentRS.getConf().getOutputName() != null) {
LOG.debug("Cloning reduce sink for multi-child broadcast edge");
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Tue Oct 14 19:06:45 2014
@@ -44,9 +44,9 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -55,13 +55,25 @@ import org.apache.hadoop.hive.ql.parse.P
import org.apache.hadoop.hive.ql.parse.QB;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.SplitSample;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
@@ -73,9 +85,11 @@ public class SimpleFetchOptimizer implem
private final Log LOG = LogFactory.getLog(SimpleFetchOptimizer.class.getName());
+ @Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
- if (pctx.getQB().isSimpleSelectQuery() && topOps.size() == 1) {
+ if (pctx.getQB().getIsQuery() && !pctx.getQB().getParseInfo().isAnalyzeCommand()
+ && topOps.size() == 1) {
// no join, no groupby, no distinct, no lateral view, no subq,
// no CTAS or insert, not analyze command, and single sourced.
String alias = (String) pctx.getTopOps().keySet().toArray()[0];
@@ -144,7 +158,7 @@ public class SimpleFetchOptimizer implem
// for non-aggressive mode (minimal)
// 1. samping is not allowed
// 2. for partitioned table, all filters should be targeted to partition column
- // 3. SelectOperator should be select star
+ // 3. SelectOperator should use only simple cast/column access
private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias,
TableScanOperator ts) throws HiveException {
SplitSample splitSample = pctx.getNameToSplitSample().get(alias);
@@ -156,7 +170,7 @@ public class SimpleFetchOptimizer implem
return null;
}
- Table table = qb.getMetaData().getAliasToTable().get(alias);
+ Table table = pctx.getTopToTable().get(ts);
if (table == null) {
return null;
}
@@ -181,34 +195,71 @@ public class SimpleFetchOptimizer implem
return null;
}
- private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggresive,
+ private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggressive,
boolean bypassFilter) {
if (ts.getChildOperators().size() != 1) {
return null;
}
Operator<?> op = ts.getChildOperators().get(0);
for (; ; op = op.getChildOperators().get(0)) {
- if (aggresive) {
- if (!(op instanceof LimitOperator || op instanceof FilterOperator
- || op instanceof SelectOperator)) {
+ if (op instanceof SelectOperator) {
+ if (!aggressive) {
+ if (!checkExpressions((SelectOperator) op)) {
+ break;
+ }
+ }
+ continue;
+ }
+
+ if (aggressive) {
+ if (!(op instanceof LimitOperator || op instanceof FilterOperator)) {
break;
}
- } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)
- || (op instanceof SelectOperator && ((SelectOperator) op).getConf().isSelectStar()))) {
+ } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) {
break;
}
+
if (op.getChildOperators() == null || op.getChildOperators().size() != 1) {
return null;
}
}
+
if (op instanceof FileSinkOperator) {
fetch.scanOp = ts;
fetch.fileSink = op;
return fetch;
}
+
return null;
}
+ private boolean checkExpressions(SelectOperator op) {
+ SelectDesc desc = op.getConf();
+ for (ExprNodeDesc expr : desc.getColList()) {
+ if (!checkExpression(expr)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean checkExpression(ExprNodeDesc expr) {
+ if (expr instanceof ExprNodeConstantDesc || expr instanceof ExprNodeColumnDesc) {
+ return true;
+ }
+
+ if (expr instanceof ExprNodeGenericFuncDesc) {
+ GenericUDF udf = ((ExprNodeGenericFuncDesc) expr).getGenericUDF();
+ if (udf instanceof GenericUDFToBinary || udf instanceof GenericUDFToChar
+ || udf instanceof GenericUDFToDate || udf instanceof GenericUDFToDecimal
+ || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp
+ || udf instanceof GenericUDFToVarchar) {
+ return expr.getChildren().size() == 1 && checkExpression(expr.getChildren().get(0));
+ }
+ }
+ return false;
+ }
+
private class FetchData {
private final ReadEntity parent;
@@ -240,7 +291,7 @@ public class SimpleFetchOptimizer implem
this.splitSample = splitSample;
this.onlyPruningFilter = bypassFilter;
}
-
+
/*
* all filters were executed during partition pruning
*/
@@ -251,7 +302,7 @@ public class SimpleFetchOptimizer implem
private FetchWork convertToWork() throws HiveException {
inputs.clear();
if (!table.isPartitioned()) {
- inputs.add(new ReadEntity(table, parent));
+ inputs.add(new ReadEntity(table, parent, parent == null));
FetchWork work = new FetchWork(table.getPath(), Utilities.getTableDesc(table));
PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc());
work.setSplitSample(splitSample);
@@ -261,12 +312,12 @@ public class SimpleFetchOptimizer implem
List<PartitionDesc> partP = new ArrayList<PartitionDesc>();
for (Partition partition : partsList.getNotDeniedPartns()) {
- inputs.add(new ReadEntity(partition, parent));
+ inputs.add(new ReadEntity(partition, parent, parent == null));
listP.add(partition.getDataLocation());
partP.add(Utilities.getPartitionDesc(partition));
}
Table sourceTable = partsList.getSourceTable();
- inputs.add(new ReadEntity(sourceTable, parent));
+ inputs.add(new ReadEntity(sourceTable, parent, parent == null));
TableDesc table = Utilities.getTableDesc(sourceTable);
FetchWork work = new FetchWork(listP, partP, table);
if (!work.getPartDesc().isEmpty()) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Tue Oct 14 19:06:45 2014
@@ -71,7 +71,6 @@ import org.apache.hadoop.hive.ql.plan.Pl
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.io.IntWritable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -85,6 +84,7 @@ import com.google.common.collect.Maps;
*/
public class SortedDynPartitionOptimizer implements Transform {
+ private static final String BUCKET_NUMBER_COL_NAME = "_bucket_number";
@Override
public ParseContext transform(ParseContext pCtx) throws SemanticException {
@@ -216,6 +216,13 @@ public class SortedDynPartitionOptimizer
ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
newValueCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
+ if (!bucketColumns.isEmpty()) {
+ String tableAlias = outRR.getColumnInfos().get(0).getTabAlias();
+ ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo,
+ tableAlias, true, true);
+ outRR.put(tableAlias, BUCKET_NUMBER_COL_NAME, ci);
+ }
+
// Create ReduceSink operator
ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
OperatorFactory.getAndMakeChild(rsConf, new RowSchema(outRR.getColumnInfos()), fsParent),
@@ -380,8 +387,11 @@ public class SortedDynPartitionOptimizer
// corresponding with bucket number and hence their OIs
for (Integer idx : keyColsPosInVal) {
if (idx < 0) {
- newKeyCols.add(new ExprNodeConstantDesc(TypeInfoFactory
- .getPrimitiveTypeInfoFromPrimitiveWritable(IntWritable.class), -1));
+ // add bucket number column to both key and value
+ ExprNodeConstantDesc encd = new ExprNodeConstantDesc(TypeInfoFactory.stringTypeInfo,
+ BUCKET_NUMBER_COL_NAME);
+ newKeyCols.add(encd);
+ newValueCols.add(encd);
} else {
newKeyCols.add(newValueCols.get(idx).clone());
}
@@ -395,7 +405,8 @@ public class SortedDynPartitionOptimizer
// should honor the ordering of records provided by ORDER BY in SELECT statement
ReduceSinkOperator parentRSOp = OperatorUtils.findSingleOperatorUpstream(parent,
ReduceSinkOperator.class);
- if (parentRSOp != null) {
+ boolean isOrderBy = parseCtx.getQB().getParseInfo().getDestToOrderBy().size() > 0;
+ if (parentRSOp != null && isOrderBy) {
String parentRSOpOrder = parentRSOp.getConf().getOrder();
if (parentRSOpOrder != null && !parentRSOpOrder.isEmpty() && sortPositions.isEmpty()) {
newKeyCols.addAll(parentRSOp.getConf().getKeyCols());
@@ -417,6 +428,9 @@ public class SortedDynPartitionOptimizer
List<String> outCols = Utilities.getInternalColumnNamesFromSignature(parent.getSchema()
.getSignature());
ArrayList<String> outValColNames = Lists.newArrayList(outCols);
+ if (!bucketColumns.isEmpty()) {
+ outValColNames.add(BUCKET_NUMBER_COL_NAME);
+ }
List<FieldSchema> valFields = PlanUtils.getFieldSchemasFromColumnList(newValueCols,
outValColNames, 0, "");
TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/StatsOptimizer.java Tue Oct 14 19:06:45 2014
@@ -193,11 +193,12 @@ public class StatsOptimizer implements T
}
SelectOperator selOp = (SelectOperator)tsOp.getChildren().get(0);
for(ExprNodeDesc desc : selOp.getConf().getColList()) {
- if (!(desc instanceof ExprNodeColumnDesc)) {
+ if (!((desc instanceof ExprNodeColumnDesc) || (desc instanceof ExprNodeConstantDesc))) {
// Probably an expression, cant handle that
return null;
}
}
+ Map<String, ExprNodeDesc> exprMap = selOp.getColumnExprMap();
// Since we have done an exact match on TS-SEL-GBY-RS-GBY-SEL-FS
// we need not to do any instanceof checks for following.
GroupByOperator gbyOp = (GroupByOperator)selOp.getChildren().get(0);
@@ -215,6 +216,12 @@ public class StatsOptimizer implements T
return null;
}
+ for(ExprNodeDesc desc : selOp.getConf().getColList()) {
+ if (!(desc instanceof ExprNodeColumnDesc)) {
+ // Probably an expression, cant handle that
+ return null;
+ }
+ }
FileSinkOperator fsOp = (FileSinkOperator)(selOp.getChildren().get(0));
if (fsOp.getChildOperators() != null && fsOp.getChildOperators().size() > 0) {
// looks like a subq plan.
@@ -236,22 +243,28 @@ public class StatsOptimizer implements T
GenericUDAFResolver udaf =
FunctionRegistry.getGenericUDAFResolver(aggr.getGenericUDAFName());
if (udaf instanceof GenericUDAFSum) {
- if(!(aggr.getParameters().get(0) instanceof ExprNodeConstantDesc)){
+ ExprNodeDesc desc = aggr.getParameters().get(0);
+ String constant;
+ if (desc instanceof ExprNodeConstantDesc) {
+ constant = ((ExprNodeConstantDesc) desc).getValue().toString();
+ } else if (desc instanceof ExprNodeColumnDesc && exprMap.get(((ExprNodeColumnDesc)desc).getColumn()) instanceof ExprNodeConstantDesc) {
+ constant = ((ExprNodeConstantDesc)exprMap.get(((ExprNodeColumnDesc)desc).getColumn())).getValue().toString();
+ } else {
return null;
}
Long rowCnt = getRowCnt(pctx, tsOp, tbl);
if(rowCnt == null) {
return null;
}
- oneRow.add(HiveDecimal.create(((ExprNodeConstantDesc) aggr.getParameters().get(0))
- .getValue().toString()).multiply(HiveDecimal.create(rowCnt)));
+ oneRow.add(HiveDecimal.create(constant).multiply(HiveDecimal.create(rowCnt)));
ois.add(PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
PrimitiveCategory.DECIMAL));
}
else if (udaf instanceof GenericUDAFCount) {
Long rowCnt = 0L;
- if ((aggr.getParameters().isEmpty() || aggr.getParameters().get(0) instanceof
- ExprNodeConstantDesc)) {
+ if (aggr.getParameters().isEmpty() || aggr.getParameters().get(0) instanceof
+ ExprNodeConstantDesc || ((aggr.getParameters().get(0) instanceof ExprNodeColumnDesc) &&
+ exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn()) instanceof ExprNodeConstantDesc)) {
// Its either count (*) or count(1) case
rowCnt = getRowCnt(pctx, tsOp, tbl);
if(rowCnt == null) {
@@ -259,12 +272,7 @@ public class StatsOptimizer implements T
}
} else {
// Its count(col) case
- if (!(aggr.getParameters().get(0) instanceof ExprNodeColumnDesc)) {
- // this is weird, we got expr or something in there, bail out
- Log.debug("Unexpected expression : " + aggr.getParameters().get(0));
- return null;
- }
- ExprNodeColumnDesc desc = (ExprNodeColumnDesc)aggr.getParameters().get(0);
+ ExprNodeColumnDesc desc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn());
String colName = desc.getColumn();
StatType type = getType(desc.getTypeString());
if(!tbl.isPartitioned()) {
@@ -330,7 +338,7 @@ public class StatsOptimizer implements T
ois.add(PrimitiveObjectInspectorFactory.
getPrimitiveJavaObjectInspector(PrimitiveCategory.LONG));
} else if (udaf instanceof GenericUDAFMax) {
- ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)aggr.getParameters().get(0);
+ ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn());
String colName = colDesc.getColumn();
StatType type = getType(colDesc.getTypeString());
if(!tbl.isPartitioned()) {
@@ -419,7 +427,7 @@ public class StatsOptimizer implements T
}
}
} else if (udaf instanceof GenericUDAFMin) {
- ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)aggr.getParameters().get(0);
+ ExprNodeColumnDesc colDesc = (ExprNodeColumnDesc)exprMap.get(((ExprNodeColumnDesc)aggr.getParameters().get(0)).getColumn());
String colName = colDesc.getColumn();
StatType type = getType(colDesc.getTypeString());
if (!tbl.isPartitioned()) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java Tue Oct 14 19:06:45 2014
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Stack;
+import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -104,7 +105,12 @@ public class OpTraitsRulesProcFactory {
List<List<String>> listBucketCols = new ArrayList<List<String>>();
listBucketCols.add(bucketCols);
- OpTraits opTraits = new OpTraits(listBucketCols, -1);
+ int numBuckets = -1;
+ OpTraits parentOpTraits = rs.getParentOperators().get(0).getConf().getOpTraits();
+ if (parentOpTraits != null) {
+ numBuckets = parentOpTraits.getNumBuckets();
+ }
+ OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listBucketCols);
rs.setOpTraits(opTraits);
return null;
}
@@ -163,15 +169,21 @@ public class OpTraitsRulesProcFactory {
} catch (HiveException e) {
prunedPartList = null;
}
- boolean bucketMapJoinConvertible = checkBucketedTable(table,
+ boolean isBucketed = checkBucketedTable(table,
opTraitsCtx.getParseContext(), prunedPartList);
- List<List<String>>bucketCols = new ArrayList<List<String>>();
+ List<List<String>> bucketColsList = new ArrayList<List<String>>();
+ List<List<String>> sortedColsList = new ArrayList<List<String>>();
int numBuckets = -1;
- if (bucketMapJoinConvertible) {
- bucketCols.add(table.getBucketCols());
+ if (isBucketed) {
+ bucketColsList.add(table.getBucketCols());
numBuckets = table.getNumBuckets();
+ List<String> sortCols = new ArrayList<String>();
+ for (Order colSortOrder : table.getSortCols()) {
+ sortCols.add(colSortOrder.getCol());
+ }
+ sortedColsList.add(sortCols);
}
- OpTraits opTraits = new OpTraits(bucketCols, numBuckets);
+ OpTraits opTraits = new OpTraits(bucketColsList, numBuckets, sortedColsList);
ts.setOpTraits(opTraits);
return null;
}
@@ -197,7 +209,7 @@ public class OpTraitsRulesProcFactory {
List<List<String>> listBucketCols = new ArrayList<List<String>>();
listBucketCols.add(gbyKeys);
- OpTraits opTraits = new OpTraits(listBucketCols, -1);
+ OpTraits opTraits = new OpTraits(listBucketCols, -1, listBucketCols);
gbyOp.setOpTraits(opTraits);
return null;
}
@@ -205,22 +217,17 @@ public class OpTraitsRulesProcFactory {
public static class SelectRule implements NodeProcessor {
- @Override
- public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
- Object... nodeOutputs) throws SemanticException {
- SelectOperator selOp = (SelectOperator)nd;
- List<List<String>> parentBucketColNames =
- selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
-
+ public List<List<String>> getConvertedColNames(List<List<String>> parentColNames,
+ SelectOperator selOp) {
List<List<String>> listBucketCols = new ArrayList<List<String>>();
if (selOp.getColumnExprMap() != null) {
- if (parentBucketColNames != null) {
- for (List<String> colNames : parentBucketColNames) {
+ if (parentColNames != null) {
+ for (List<String> colNames : parentColNames) {
List<String> bucketColNames = new ArrayList<String>();
for (String colName : colNames) {
for (Entry<String, ExprNodeDesc> entry : selOp.getColumnExprMap().entrySet()) {
if (entry.getValue() instanceof ExprNodeColumnDesc) {
- if(((ExprNodeColumnDesc)(entry.getValue())).getColumn().equals(colName)) {
+ if (((ExprNodeColumnDesc) (entry.getValue())).getColumn().equals(colName)) {
bucketColNames.add(entry.getKey());
}
}
@@ -231,11 +238,34 @@ public class OpTraitsRulesProcFactory {
}
}
+ return listBucketCols;
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ SelectOperator selOp = (SelectOperator)nd;
+ List<List<String>> parentBucketColNames =
+ selOp.getParentOperators().get(0).getOpTraits().getBucketColNames();
+
+ List<List<String>> listBucketCols = null;
+ List<List<String>> listSortCols = null;
+ if (selOp.getColumnExprMap() != null) {
+ if (parentBucketColNames != null) {
+ listBucketCols = getConvertedColNames(parentBucketColNames, selOp);
+ }
+ List<List<String>> parentSortColNames = selOp.getParentOperators().get(0).getOpTraits()
+ .getSortCols();
+ if (parentSortColNames != null) {
+ listSortCols = getConvertedColNames(parentSortColNames, selOp);
+ }
+ }
+
int numBuckets = -1;
if (selOp.getParentOperators().get(0).getOpTraits() != null) {
numBuckets = selOp.getParentOperators().get(0).getOpTraits().getNumBuckets();
}
- OpTraits opTraits = new OpTraits(listBucketCols, numBuckets);
+ OpTraits opTraits = new OpTraits(listBucketCols, numBuckets, listSortCols);
selOp.setOpTraits(opTraits);
return null;
}
@@ -248,6 +278,7 @@ public class OpTraitsRulesProcFactory {
Object... nodeOutputs) throws SemanticException {
JoinOperator joinOp = (JoinOperator)nd;
List<List<String>> bucketColsList = new ArrayList<List<String>>();
+ List<List<String>> sortColsList = new ArrayList<List<String>>();
byte pos = 0;
for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
if (!(parentOp instanceof ReduceSinkOperator)) {
@@ -259,26 +290,24 @@ public class OpTraitsRulesProcFactory {
ReduceSinkRule rsRule = new ReduceSinkRule();
rsRule.process(rsOp, stack, procCtx, nodeOutputs);
}
- bucketColsList.add(getOutputColNames(joinOp, rsOp, pos));
+ bucketColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getBucketColNames(), pos));
+ sortColsList.add(getOutputColNames(joinOp, rsOp.getOpTraits().getSortCols(), pos));
pos++;
}
- joinOp.setOpTraits(new OpTraits(bucketColsList, -1));
+ joinOp.setOpTraits(new OpTraits(bucketColsList, -1, bucketColsList));
return null;
}
- private List<String> getOutputColNames(JoinOperator joinOp,
- ReduceSinkOperator rs, byte pos) {
- List<List<String>> parentBucketColNames =
- rs.getOpTraits().getBucketColNames();
-
- if (parentBucketColNames != null) {
+ private List<String> getOutputColNames(JoinOperator joinOp, List<List<String>> parentColNames,
+ byte pos) {
+ if (parentColNames != null) {
List<String> bucketColNames = new ArrayList<String>();
// guaranteed that there is only 1 list within this list because
// a reduce sink always brings down the bucketing cols to a single list.
// may not be true with correlation operators (mux-demux)
- List<String> colNames = parentBucketColNames.get(0);
+ List<String> colNames = parentColNames.get(0);
for (String colName : colNames) {
for (ExprNodeDesc exprNode : joinOp.getConf().getExprs().get(pos)) {
if (exprNode instanceof ExprNodeColumnDesc) {
@@ -317,7 +346,7 @@ public class OpTraitsRulesProcFactory {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
- OpTraits opTraits = new OpTraits(null, -1);
+ OpTraits opTraits = new OpTraits(null, -1, null);
@SuppressWarnings("unchecked")
Operator<? extends OperatorDesc> operator = (Operator<? extends OperatorDesc>)nd;
operator.setOpTraits(opTraits);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CrossProductCheck.java Tue Oct 14 19:06:45 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -152,6 +154,11 @@ public class CrossProductCheck implement
private void checkMapJoins(TezWork tzWrk) throws SemanticException {
for(BaseWork wrk : tzWrk.getAllWork() ) {
+
+ if ( wrk instanceof MergeJoinWork ) {
+ wrk = ((MergeJoinWork)wrk).getMainWork();
+ }
+
List<String> warnings = new MapJoinCheck(wrk.getName()).analyze(wrk);
if ( !warnings.isEmpty() ) {
for(String w : warnings) {
@@ -163,12 +170,17 @@ public class CrossProductCheck implement
private void checkTezReducer(TezWork tzWrk) throws SemanticException {
for(BaseWork wrk : tzWrk.getAllWork() ) {
- if ( !(wrk instanceof ReduceWork) ) {
+
+ if ( wrk instanceof MergeJoinWork ) {
+ wrk = ((MergeJoinWork)wrk).getMainWork();
+ }
+
+ if ( !(wrk instanceof ReduceWork ) ) {
continue;
}
ReduceWork rWork = (ReduceWork) wrk;
Operator<? extends OperatorDesc> reducer = ((ReduceWork)wrk).getReducer();
- if ( reducer instanceof JoinOperator ) {
+ if ( reducer instanceof JoinOperator || reducer instanceof CommonMergeJoinOperator ) {
Map<Integer, ExtractReduceSinkInfo.Info> rsInfo =
new HashMap<Integer, ExtractReduceSinkInfo.Info>();
for(Map.Entry<Integer, String> e : rWork.getTagToInput().entrySet()) {
@@ -185,7 +197,7 @@ public class CrossProductCheck implement
return;
}
Operator<? extends OperatorDesc> reducer = rWrk.getReducer();
- if ( reducer instanceof JoinOperator ) {
+ if ( reducer instanceof JoinOperator|| reducer instanceof CommonMergeJoinOperator ) {
BaseWork prntWork = mrWrk.getMapWork();
checkForCrossProduct(taskName, reducer,
new ExtractReduceSinkInfo(null).analyze(prntWork));