You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2014/06/05 20:28:15 UTC
svn commit: r1600719 [2/40] - in /hive/trunk:
contrib/src/test/results/clientpositive/
hbase-handler/src/test/results/positive/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/h...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Thu Jun 5 18:28:07 2014
@@ -75,7 +75,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
import org.apache.hadoop.hive.ql.plan.SelectDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
@@ -363,7 +362,7 @@ public class MapJoinProcessor implements
// create the map-join operator
MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
- op, joinTree, mapJoinPos, noCheckOuterJoin);
+ op, joinTree, mapJoinPos, noCheckOuterJoin, false);
// remove old parents
@@ -387,8 +386,8 @@ public class MapJoinProcessor implements
public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
- JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
- throws SemanticException {
+ JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
+ boolean tezJoin) throws SemanticException {
JoinDesc desc = op.getConf();
JoinCondDesc[] condns = desc.getConds();
@@ -401,19 +400,17 @@ public class MapJoinProcessor implements
}
}
- Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-
// Walk over all the sources (which are guaranteed to be reduce sink
// operators).
// The join outputs a concatenation of all the inputs.
QBJoinTree leftSrc = joinTree.getJoinSrc();
- List<Operator<? extends OperatorDesc>> oldReduceSinkParentOps =
- new ArrayList<Operator<? extends OperatorDesc>>();
+ List<ReduceSinkOperator> oldReduceSinkParentOps =
+ new ArrayList<ReduceSinkOperator>(op.getNumParent());
if (leftSrc != null) {
// assert mapJoinPos == 0;
Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
assert parentOp.getParentOperators().size() == 1;
- oldReduceSinkParentOps.add(parentOp);
+ oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
}
@@ -422,25 +419,11 @@ public class MapJoinProcessor implements
if (src != null) {
Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
assert parentOp.getParentOperators().size() == 1;
- oldReduceSinkParentOps.add(parentOp);
+ oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
}
pos++;
}
- // get the join keys from old parent ReduceSink operators
- for (pos = 0; pos < op.getParentOperators().size(); pos++) {
- ReduceSinkOperator parent = (ReduceSinkOperator) oldReduceSinkParentOps.get(pos);
- ReduceSinkDesc rsconf = parent.getConf();
- List<ExprNodeDesc> keys = rsconf.getKeyCols();
- keyExprMap.put(pos, keys);
- }
-
- List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
- StringBuilder keyOrder = new StringBuilder();
- for (int i = 0; i < keyCols.size(); i++) {
- keyOrder.append("+");
- }
-
Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
@@ -466,37 +449,57 @@ public class MapJoinProcessor implements
}
}
+ // rewrite value index for mapjoin
+ Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
+
+ // get the join keys from old parent ReduceSink operators
+ Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
// construct valueTableDescs and valueFilteredTableDescs
List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
- List<TableDesc> valueFiltedTableDescs = new ArrayList<TableDesc>();
+ List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
int[][] filterMap = desc.getFilterMap();
for (pos = 0; pos < op.getParentOperators().size(); pos++) {
- List<ExprNodeDesc> valueCols = newValueExprs.get(Byte.valueOf((byte) pos));
- int length = valueCols.size();
- List<ExprNodeDesc> valueFilteredCols = new ArrayList<ExprNodeDesc>(length);
- // deep copy expr node desc
- for (int i = 0; i < length; i++) {
- valueFilteredCols.add(valueCols.get(i).clone());
+ ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
+ List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
+ List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+ // remove values referencing key
+ // todo: currently, mr-mapjoin stores whole value exprs of join operator, which may contain key
+ if (tezJoin && pos != mapJoinPos) {
+ int[] valueIndex = new int[valueCols.size()];
+ List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
+ for (int i = 0; i < valueIndex.length; i++) {
+ ExprNodeDesc expr = valueCols.get(i);
+ int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
+ if (kindex >= 0) {
+ valueIndex[i] = kindex;
+ } else {
+ valueIndex[i] = -valueColsInValueExpr.size() - 1;
+ valueColsInValueExpr.add(expr);
+ }
+ }
+ if (needValueIndex(valueIndex)) {
+ valueIndices.put(pos, valueIndex);
+ }
+ valueCols = valueColsInValueExpr;
}
+ // deep copy expr node desc
+ List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false);
valueFilteredCols.add(isFilterDesc);
}
-
- keyOrder = new StringBuilder();
- for (int i = 0; i < valueCols.size(); i++) {
- keyOrder.append("+");
- }
-
TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
.getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
.getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
valueTableDescs.add(valueTableDesc);
- valueFiltedTableDescs.add(valueFilteredTableDesc);
+ valueFilteredTableDescs.add(valueFilteredTableDesc);
+
+ keyExprMap.put(pos, keyCols);
}
Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
@@ -505,7 +508,7 @@ public class MapJoinProcessor implements
byte srcTag = entry.getKey();
List<ExprNodeDesc> filter = entry.getValue();
- Operator<?> terminal = op.getParentOperators().get(srcTag);
+ Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
}
desc.setFilters(filters = newFilters);
@@ -521,17 +524,22 @@ public class MapJoinProcessor implements
dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
}
+ List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos);
+
List<String> outputColumnNames = op.getConf().getOutputColumnNames();
TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
JoinCondDesc[] joinCondns = op.getConf().getConds();
MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
- valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns,
+ valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns,
filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
mapJoinDescriptor.setTagOrder(tagOrder);
mapJoinDescriptor.setNullSafes(desc.getNullSafes());
mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+ if (!valueIndices.isEmpty()) {
+ mapJoinDescriptor.setValueIndices(valueIndices);
+ }
// reduce sink row resolver used to generate map join op
RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -558,6 +566,15 @@ public class MapJoinProcessor implements
}
+ private static boolean needValueIndex(int[] valueIndex) {
+ for (int i = 0; i < valueIndex.length; i++) {
+ if (valueIndex[i] != -i - 1) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* convert a sortmerge join to a a map-side join.
*
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/CorrelationOptimizer.java Thu Jun 5 18:28:07 2014
@@ -36,7 +36,6 @@ import org.apache.hadoop.fs.ContentSumma
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -62,6 +61,7 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.optimizer.physical.CommonJoinTaskDispatcher;
import org.apache.hadoop.hive.ql.parse.OpParseContext;
import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -341,7 +341,6 @@ public class CorrelationOptimizer implem
IntraQueryCorrelation correlation) throws SemanticException {
LOG.info("now detecting operator " + current.getIdentifier() + " " + current.getName());
-
LinkedHashSet<ReduceSinkOperator> correlatedReduceSinkOperators =
new LinkedHashSet<ReduceSinkOperator>();
if (skipedJoinOperators.contains(current)) {
@@ -387,18 +386,18 @@ public class CorrelationOptimizer implem
ExprNodeDescUtils.backtrack(childKeyCols, child, current);
List<ExprNodeDesc> backtrackedPartitionCols =
ExprNodeDescUtils.backtrack(childPartitionCols, child, current);
+
+ OpParseContext opCtx = pCtx.getOpParseCtx().get(current);
+ RowResolver rowResolver = opCtx.getRowResolver();
Set<String> tableNeedToCheck = new HashSet<String>();
for (ExprNodeDesc expr: childKeyCols) {
if (!(expr instanceof ExprNodeColumnDesc)) {
return correlatedReduceSinkOperators;
- } else {
- String colName = ((ExprNodeColumnDesc)expr).getColumn();
- OpParseContext opCtx = pCtx.getOpParseCtx().get(current);
- for (ColumnInfo cinfo : opCtx.getRowResolver().getColumnInfos()) {
- if (colName.equals(cinfo.getInternalName())) {
- tableNeedToCheck.add(cinfo.getTabAlias());
- }
- }
+ }
+ String colName = ((ExprNodeColumnDesc)expr).getColumn();
+ String[] nm = rowResolver.reverseLookup(colName);
+ if (nm != null) {
+ tableNeedToCheck.add(nm[0]);
}
}
if (current instanceof JoinOperator) {
@@ -576,7 +575,6 @@ public class CorrelationOptimizer implem
Object... nodeOutputs) throws SemanticException {
CorrelationNodeProcCtx corrCtx = (CorrelationNodeProcCtx) ctx;
ReduceSinkOperator op = (ReduceSinkOperator) nd;
-
// Check if we have visited this operator
if (corrCtx.isWalked(op)) {
return null;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java Thu Jun 5 18:28:07 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Gr
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -163,7 +164,7 @@ public class ReduceSinkDeDuplication imp
}
return false;
}
- if (child instanceof ExtractOperator) {
+ if (child instanceof ExtractOperator || child instanceof SelectOperator) {
return process(cRS, dedupCtx);
}
return false;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcCtx.java Thu Jun 5 18:28:07 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.optimi
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
/**
@@ -69,4 +70,8 @@ public class ExprProcCtx implements Node
public Operator<? extends OperatorDesc> getInputOperator() {
return inpOp;
}
+
+ public RowResolver getResolver() {
+ return lctx.getParseCtx().getOpParseCtx().get(inpOp).getRowResolver();
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/ExprProcFactory.java Thu Jun 5 18:28:07 2014
@@ -28,6 +28,8 @@ import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -70,20 +73,19 @@ public class ExprProcFactory {
// assert that the input operator is not null as there are no
// exprs associated with table scans.
- assert (epc.getInputOperator() != null);
+ Operator<? extends OperatorDesc> operator = epc.getInputOperator();
+ assert (operator != null);
- ColumnInfo inp_ci = null;
- for (ColumnInfo tmp_ci : epc.getInputOperator().getSchema()
- .getSignature()) {
- if (tmp_ci.getInternalName().equals(cd.getColumn())) {
- inp_ci = tmp_ci;
- break;
- }
+ RowResolver resolver = epc.getResolver();
+ String[] nm = resolver.reverseLookup(cd.getColumn());
+ if (nm == null && operator instanceof ReduceSinkOperator) {
+ nm = resolver.reverseLookup(Utilities.removeValueTag(cd.getColumn()));
}
+ ColumnInfo ci = nm != null ? resolver.get(nm[0], nm[1]): null;
// Insert the dependencies of inp_ci to that of the current operator, ci
LineageCtx lc = epc.getLineageCtx();
- Dependency dep = lc.getIndex().getDependency(epc.getInputOperator(), inp_ci);
+ Dependency dep = lc.getIndex().getDependency(operator, ci);
return dep;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/lineage/OpProcFactory.java Thu Jun 5 18:28:07 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
import org.apache.hadoop.hive.ql.exec.ForwardOperator;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.exec.Re
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.SelectOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.LineageInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.BaseColumnInfo;
import org.apache.hadoop.hive.ql.hooks.LineageInfo.Dependency;
@@ -52,6 +54,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.Utils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.AggregationDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -213,8 +216,8 @@ public class OpProcFactory {
// Otherwise look up the expression corresponding to this ci
ExprNodeDesc expr = exprs.get(cnt++);
- lCtx.getIndex().mergeDependency(op, ci,
- ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+ Dependency dependency = ExprProcFactory.getExprDependency(lCtx, inpOp, expr);
+ lCtx.getIndex().mergeDependency(op, ci, dependency);
}
return null;
@@ -438,7 +441,6 @@ public class OpProcFactory {
LineageCtx lCtx = (LineageCtx) procCtx;
ReduceSinkOperator rop = (ReduceSinkOperator)nd;
- ArrayList<ColumnInfo> col_infos = rop.getSchema().getSignature();
Operator<? extends OperatorDesc> inpOp = getParent(stack);
int cnt = 0;
@@ -450,15 +452,49 @@ public class OpProcFactory {
}
if (op instanceof GroupByOperator) {
+ ArrayList<ColumnInfo> col_infos = rop.getSchema().getSignature();
for(ExprNodeDesc expr : rop.getConf().getKeyCols()) {
lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
}
- }
-
- for(ExprNodeDesc expr : rop.getConf().getValueCols()) {
- lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
- ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+ for(ExprNodeDesc expr : rop.getConf().getValueCols()) {
+ lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
+ ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+ }
+ } else if (op instanceof ExtractOperator) {
+ ArrayList<ColumnInfo> col_infos = rop.getSchema().getSignature();
+ for(ExprNodeDesc expr : rop.getConf().getValueCols()) {
+ lCtx.getIndex().putDependency(rop, col_infos.get(cnt++),
+ ExprProcFactory.getExprDependency(lCtx, inpOp, expr));
+ }
+ } else {
+ RowResolver resolver = lCtx.getParseCtx().getOpParseCtx().get(rop).getRowResolver();
+ ReduceSinkDesc desc = rop.getConf();
+ List<ExprNodeDesc> keyCols = desc.getKeyCols();
+ ArrayList<String> keyColNames = desc.getOutputKeyColumnNames();
+ for (int i = 0; i < keyCols.size(); i++) {
+ // order-bys, joins
+ String[] nm = resolver.reverseLookup(Utilities.ReduceField.KEY + "." + keyColNames.get(i));
+ if (nm == null) {
+ continue; // key in values
+ }
+ ColumnInfo column = resolver.get(nm[0], nm[1]);
+ lCtx.getIndex().putDependency(rop, column,
+ ExprProcFactory.getExprDependency(lCtx, inpOp, keyCols.get(i)));
+ }
+ List<ExprNodeDesc> valCols = desc.getValueCols();
+ ArrayList<String> valColNames = desc.getOutputValueColumnNames();
+ for (int i = 0; i < valCols.size(); i++) {
+ // todo: currently, bucketing,etc. makes RS differently with those for order-bys or joins
+ String[] nm = resolver.reverseLookup(valColNames.get(i));
+ if (nm == null) {
+ // order-bys, joins
+ nm = resolver.reverseLookup(Utilities.ReduceField.VALUE + "." + valColNames.get(i));
+ }
+ ColumnInfo column = resolver.get(nm[0], nm[1]);
+ lCtx.getIndex().putDependency(rop, column,
+ ExprProcFactory.getExprDependency(lCtx, inpOp, valCols.get(i)));
+ }
}
return null;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/BucketingSortingOpProcFactory.java Thu Jun 5 18:28:07 2014
@@ -20,7 +20,9 @@ package org.apache.hadoop.hive.ql.optimi
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -42,8 +44,11 @@ import org.apache.hadoop.hive.ql.optimiz
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
/**
* Operator factory for the rule processors for inferring bucketing/sorting columns.
@@ -129,8 +134,11 @@ public class BucketingSortingOpProcFacto
BucketingSortingCtx bctx = (BucketingSortingCtx)procCtx;
JoinOperator jop = (JoinOperator)nd;
- List<ColumnInfo> colInfos = jop.getSchema().getSignature();
- Byte[] order = jop.getConf().getTagOrder();
+ JoinDesc joinDesc = jop.getConf();
+
+ Byte[] order = joinDesc.getTagOrder();
+ Map<Byte, List<ExprNodeDesc>> expressions = joinDesc.getExprs();
+ List<String> outputValNames = joinDesc.getOutputColumnNames();
BucketCol[] newBucketCols = null;
SortCol[] newSortCols = null;
@@ -143,63 +151,55 @@ public class BucketingSortingOpProcFacto
assert(parent instanceof ReduceSinkOperator);
ReduceSinkOperator rop = (ReduceSinkOperator)jop.getParentOperators().get(i);
+ ReduceSinkDesc rsDesc = rop.getConf();
- String sortOrder = rop.getConf().getOrder();
- List<BucketCol> bucketCols = new ArrayList<BucketCol>();
- List<SortCol> sortCols = new ArrayList<SortCol>();
- // Go through the Reduce keys and find the matching column(s) in the reduce values
- for (int keyIndex = 0; keyIndex < rop.getConf().getKeyCols().size(); keyIndex++) {
- for (int valueIndex = 0; valueIndex < rop.getConf().getValueCols().size();
- valueIndex++) {
-
- if (new ExprNodeDescEqualityWrapper(rop.getConf().getValueCols().get(valueIndex)).
- equals(new ExprNodeDescEqualityWrapper(rop.getConf().getKeyCols().get(
- keyIndex)))) {
-
- String colName = rop.getSchema().getSignature().get(valueIndex).getInternalName();
- bucketCols.add(new BucketCol(colName, keyIndex));
- sortCols.add(new SortCol(colName, keyIndex, sortOrder.charAt(keyIndex)));
- break;
- }
+ byte tag = (byte) rsDesc.getTag();
+ List<ExprNodeDesc> joinValues = expressions.get(tag);
+
+ // Columns are output from the join from the different reduce sinks in the order of their
+ // offsets
+ int offset = 0;
+ for (byte orderIndex = 0; orderIndex < order.length; orderIndex++) {
+ if (order[orderIndex] < order[tag]) {
+ offset += expressions.get(orderIndex).size();
}
}
- if (bucketCols.isEmpty()) {
- assert(sortCols.isEmpty());
- continue;
- }
+ String sortOrder = rsDesc.getOrder();
+ List<ExprNodeDesc> keyCols = rsDesc.getKeyCols();
+ List<ExprNodeDesc> valCols = ExprNodeDescUtils.backtrack(joinValues, jop, parent);
if (newBucketCols == null) {
- assert(newSortCols == null);
- // The number of join keys is equal to the number of keys in every reducer, although
- // not every key may map to a value in the reducer
- newBucketCols = new BucketCol[rop.getConf().getKeyCols().size()];
- newSortCols = new SortCol[rop.getConf().getKeyCols().size()];
- } else {
- assert(newSortCols != null);
+ newBucketCols = new BucketCol[keyCols.size()];
+ newSortCols = new SortCol[keyCols.size()];
}
- byte tag = (byte)rop.getConf().getTag();
- List<ExprNodeDesc> exprs = jop.getConf().getExprs().get(tag);
-
- int colInfosOffset = 0;
- int orderValue = order[tag];
- // Columns are output from the join from the different reduce sinks in the order of their
- // offsets
- for (byte orderIndex = 0; orderIndex < order.length; orderIndex++) {
- if (order[orderIndex] < orderValue) {
- colInfosOffset += jop.getConf().getExprs().get(orderIndex).size();
+ // Go through the Reduce keys and find the matching column(s) in the reduce values
+ for (int keyIndex = 0 ; keyIndex < keyCols.size(); keyIndex++) {
+ ExprNodeDesc key = keyCols.get(keyIndex);
+ int index = ExprNodeDescUtils.indexOf(key, valCols);
+ if (index >= 0) {
+ int vindex = offset + index;
+ String vname = outputValNames.get(vindex);
+ if (newBucketCols[keyIndex] != null) {
+ newBucketCols[keyIndex].addAlias(vname, vindex);
+ newSortCols[keyIndex].addAlias(vname, vindex);
+ } else {
+ newBucketCols[keyIndex] = new BucketCol(vname, vindex);
+ newSortCols[keyIndex] = new SortCol(vname, vindex, sortOrder.charAt(keyIndex));
+ }
}
}
-
- findBucketingSortingColumns(exprs, colInfos, bucketCols, sortCols, newBucketCols,
- newSortCols, colInfosOffset);
-
}
- setBucketingColsIfComplete(bctx, jop, newBucketCols);
-
- setSortingColsIfComplete(bctx, jop, newSortCols);
+ List<BucketCol> bucketCols = Arrays.asList(newBucketCols);
+ if (!bucketCols.contains(null)) {
+ bctx.setBucketedCols(jop, bucketCols);
+ }
+ List<SortCol> sortCols = Arrays.asList(newSortCols);
+ if (!sortCols.contains(null)) {
+ bctx.setSortedCols(jop, sortCols);
+ }
return null;
}
@@ -331,6 +331,12 @@ public class BucketingSortingOpProcFacto
BucketingSortingCtx bctx = (BucketingSortingCtx)procCtx;
SelectOperator sop = (SelectOperator)nd;
+ if (sop.getNumParent() == 1 &&
+ sop.getParentOperators().get(0) instanceof ReduceSinkOperator) {
+ ReduceSinkOperator rs = (ReduceSinkOperator) sop.getParentOperators().get(0);
+ extractTraits(bctx, rs, sop);
+ return null;
+ }
Operator<? extends OperatorDesc> parent = getParent(stack);
// if this is a selStarNoCompute then this select operator
@@ -506,71 +512,83 @@ public class BucketingSortingOpProcFacto
Operator<? extends OperatorDesc> parent = exop.getParentOperators().get(0);
// The caller of this method should guarantee this
- assert(parent instanceof ReduceSinkOperator);
+ if (parent instanceof ReduceSinkOperator) {
+ extractTraits(bctx, (ReduceSinkOperator)parent, exop);
+ }
- ReduceSinkOperator rop = (ReduceSinkOperator)parent;
+ return null;
+ }
+ }
- // Go through the set of partition columns, and find their representatives in the values
- // These represent the bucketed columns
- List<BucketCol> bucketCols = new ArrayList<BucketCol>();
- for (int i = 0; i < rop.getConf().getPartitionCols().size(); i++) {
- boolean valueColFound = false;
- for (int j = 0; j < rop.getConf().getValueCols().size(); j++) {
- if (new ExprNodeDescEqualityWrapper(rop.getConf().getValueCols().get(j)).equals(
- new ExprNodeDescEqualityWrapper(rop.getConf().getPartitionCols().get(i)))) {
-
- bucketCols.add(new BucketCol(
- rop.getSchema().getSignature().get(j).getInternalName(), j));
- valueColFound = true;
- break;
- }
- }
+ static void extractTraits(BucketingSortingCtx bctx, ReduceSinkOperator rop, Operator<?> exop)
+ throws SemanticException {
- // If the partition columns can't all be found in the values then the data is not bucketed
- if (!valueColFound) {
- bucketCols.clear();
- break;
- }
- }
+ List<ExprNodeDesc> outputValues = Collections.emptyList();
+ if (exop instanceof ExtractOperator) {
+ outputValues = rop.getConf().getValueCols();
+ } else if (exop instanceof SelectOperator) {
+ SelectDesc select = ((SelectOperator)exop).getConf();
+ outputValues = ExprNodeDescUtils.backtrack(select.getColList(), exop, rop);
+ }
+ if (outputValues.isEmpty()) {
+ return;
+ }
- // Go through the set of key columns, and find their representatives in the values
- // These represent the sorted columns
- String sortOrder = rop.getConf().getOrder();
- List<SortCol> sortCols = new ArrayList<SortCol>();
- for (int i = 0; i < rop.getConf().getKeyCols().size(); i++) {
- boolean valueColFound = false;
- for (int j = 0; j < rop.getConf().getValueCols().size(); j++) {
- if (new ExprNodeDescEqualityWrapper(rop.getConf().getValueCols().get(j)).equals(
- new ExprNodeDescEqualityWrapper(rop.getConf().getKeyCols().get(i)))) {
-
- sortCols.add(new SortCol(
- rop.getSchema().getSignature().get(j).getInternalName(), j, sortOrder.charAt(i)));
- valueColFound = true;
- break;
- }
- }
+ // Go through the set of partition columns, and find their representatives in the values
+ // These represent the bucketed columns
+ List<BucketCol> bucketCols = extractBucketCols(rop, outputValues);
- // If the sorted columns can't all be found in the values then the data is only sorted on
- // the columns seen up until now
- if (!valueColFound) {
- break;
- }
- }
+ // Go through the set of key columns, and find their representatives in the values
+ // These represent the sorted columns
+ List<SortCol> sortCols = extractSortCols(rop, outputValues);
- List<ColumnInfo> colInfos = exop.getSchema().getSignature();
+ List<ColumnInfo> colInfos = exop.getSchema().getSignature();
- if (!bucketCols.isEmpty()) {
- List<BucketCol> newBucketCols = getNewBucketCols(bucketCols, colInfos);
- bctx.setBucketedCols(exop, newBucketCols);
- }
+ if (!bucketCols.isEmpty()) {
+ List<BucketCol> newBucketCols = getNewBucketCols(bucketCols, colInfos);
+ bctx.setBucketedCols(exop, newBucketCols);
+ }
- if (!sortCols.isEmpty()) {
- List<SortCol> newSortCols = getNewSortCols(sortCols, colInfos);
- bctx.setSortedCols(exop, newSortCols);
+ if (!sortCols.isEmpty()) {
+ List<SortCol> newSortCols = getNewSortCols(sortCols, colInfos);
+ bctx.setSortedCols(exop, newSortCols);
+ }
+ }
+
+ static List<BucketCol> extractBucketCols(ReduceSinkOperator rop, List<ExprNodeDesc> outputValues) {
+ List<BucketCol> bucketCols = new ArrayList<BucketCol>();
+ for (ExprNodeDesc partitionCol : rop.getConf().getPartitionCols()) {
+ if (!(partitionCol instanceof ExprNodeColumnDesc)) {
+ return Collections.emptyList();
}
+ int index = ExprNodeDescUtils.indexOf(partitionCol, outputValues);
+ if (index < 0) {
+ return Collections.emptyList();
+ }
+ bucketCols.add(new BucketCol(((ExprNodeColumnDesc) partitionCol).getColumn(), index));
+ }
+ // If the partition columns can't all be found in the values then the data is not bucketed
+ return bucketCols;
+ }
- return null;
+ static List<SortCol> extractSortCols(ReduceSinkOperator rop, List<ExprNodeDesc> outputValues) {
+ String sortOrder = rop.getConf().getOrder();
+ List<SortCol> sortCols = new ArrayList<SortCol>();
+ ArrayList<ExprNodeDesc> keyCols = rop.getConf().getKeyCols();
+ for (int i = 0; i < keyCols.size(); i++) {
+ ExprNodeDesc keyCol = keyCols.get(i);
+ if (!(keyCol instanceof ExprNodeColumnDesc)) {
+ break;
+ }
+ int index = ExprNodeDescUtils.indexOf(keyCol, outputValues);
+ if (index < 0) {
+ break;
+ }
+ sortCols.add(new SortCol(((ExprNodeColumnDesc) keyCol).getColumn(), index, sortOrder.charAt(i)));
}
+ // If the sorted columns can't all be found in the values then the data is only sorted on
+ // the columns seen up until now
+ return sortCols;
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Thu Jun 5 18:28:07 2014
@@ -97,24 +97,23 @@ public class RowResolver implements Seri
}
public void put(String tab_alias, String col_alias, ColumnInfo colInfo) {
+ if (!addMappingOnly(tab_alias, col_alias, colInfo)) {
+ rowSchema.getSignature().add(colInfo);
+ }
+ }
+
+ public boolean addMappingOnly(String tab_alias, String col_alias, ColumnInfo colInfo) {
if (tab_alias != null) {
tab_alias = tab_alias.toLowerCase();
}
col_alias = col_alias.toLowerCase();
- if (rowSchema.getSignature() == null) {
- rowSchema.setSignature(new ArrayList<ColumnInfo>());
- }
-
+
/*
* allow multiple mappings to the same ColumnInfo.
- * When a ColumnInfo is mapped multiple times, only the
+ * When a ColumnInfo is mapped multiple times, only the
* first inverse mapping is captured.
*/
boolean colPresent = invRslvMap.containsKey(colInfo.getInternalName());
-
- if ( !colPresent ) {
- rowSchema.getSignature().add(colInfo);
- }
LinkedHashMap<String, ColumnInfo> f_map = rslvMap.get(tab_alias);
if (f_map == null) {
@@ -127,10 +126,12 @@ public class RowResolver implements Seri
qualifiedAlias[0] = tab_alias;
qualifiedAlias[1] = col_alias;
if ( !colPresent ) {
- invRslvMap.put(colInfo.getInternalName(), qualifiedAlias);
+ invRslvMap.put(colInfo.getInternalName(), qualifiedAlias);
} else {
altInvRslvMap.put(colInfo.getInternalName(), qualifiedAlias);
}
+
+ return colPresent;
}
public boolean hasTableAlias(String tab_alias) {
@@ -350,18 +351,4 @@ public class RowResolver implements Seri
this.expressionMap = expressionMap;
}
- public String[] toColumnDesc() {
- StringBuilder cols = new StringBuilder();
- StringBuilder colTypes = new StringBuilder();
-
- for (ColumnInfo colInfo : getColumnInfos()) {
- if (cols.length() > 0) {
- cols.append(',');
- colTypes.append(':');
- }
- cols.append(colInfo.getInternalName());
- colTypes.append(colInfo.getType().getTypeName());
- }
- return new String[] {cols.toString(), colTypes.toString()};
- }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Jun 5 18:28:07 2014
@@ -2708,7 +2708,7 @@ public class SemanticAnalyzer extends Ba
fetchFilesNotInLocalFilesystem(stripQuotes(trfm.getChild(execPos).getText())),
inInfo, inRecordWriter, outInfo, outRecordReader, errRecordReader, errInfo),
new RowSchema(out_rwsch.getColumnInfos()), input), out_rwsch);
- output.setColumnExprMap(Collections.emptyMap()); // disable backtracking
+ output.setColumnExprMap(new HashMap<String, ExprNodeDesc>()); // disable backtracking
return output;
}
@@ -3117,15 +3117,18 @@ public class SemanticAnalyzer extends Ba
startPosn = 0;
}
+ Set<String> colAliases = new HashSet<String>();
+ ASTNode[] exprs = new ASTNode[exprList.getChildCount()];
+ String[][] aliases = new String[exprList.getChildCount()][];
+ boolean[] hasAsClauses = new boolean[exprList.getChildCount()];
// Iterate over all expression (either after SELECT, or in SELECT TRANSFORM)
for (int i = startPosn; i < exprList.getChildCount(); ++i) {
// child can be EXPR AS ALIAS, or EXPR.
ASTNode child = (ASTNode) exprList.getChild(i);
boolean hasAsClause = (!isInTransform) && (child.getChildCount() == 2);
- boolean isWindowSpec = child.getChildCount() == 3 ?
- (child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC) :
- false;
+ boolean isWindowSpec = child.getChildCount() == 3 &&
+ child.getChild(2).getType() == HiveParser.TOK_WINDOWSPEC;
// EXPR AS (ALIAS,...) parses, but is only allowed for UDTF's
// This check is not needed and invalid when there is a transform b/c the
@@ -3156,8 +3159,20 @@ public class SemanticAnalyzer extends Ba
unparseTranslator.addIdentifierTranslation((ASTNode) child
.getChild(1));
}
-
}
+ exprs[i] = expr;
+ aliases[i] = new String[] {tabAlias, colAlias};
+ hasAsClauses[i] = hasAsClause;
+ colAliases.add(colAlias);
+ }
+
+ // Iterate over all expression (either after SELECT, or in SELECT TRANSFORM)
+ for (int i = startPosn; i < exprList.getChildCount(); ++i) {
+ // The real expression
+ ASTNode expr = exprs[i];
+ String tabAlias = aliases[i][0];
+ String colAlias = aliases[i][1];
+ boolean hasAsClause = hasAsClauses[i];
if (expr.getType() == HiveParser.TOK_ALLCOLREF) {
pos = genColListRegex(".*", expr.getChildCount() == 0 ? null
@@ -3193,7 +3208,8 @@ public class SemanticAnalyzer extends Ba
tcCtx.setAllowDistinctFunctions(false);
ExprNodeDesc exp = genExprNodeDesc(expr, inputRR, tcCtx);
String recommended = recommendName(exp, colAlias);
- if (recommended != null && out_rwsch.get(null, recommended) == null) {
+ if (recommended != null && !colAliases.contains(recommended) &&
+ out_rwsch.get(null, recommended) == null) {
colAlias = recommended;
}
col_list.add(exp);
@@ -4726,7 +4742,7 @@ public class SemanticAnalyzer extends Ba
* distinct key in hope of getting a uniform distribution, and
* compute partial aggregates by the grouping key. Evaluate partial
* aggregates first, and spray by the grouping key to compute actual
- * aggregates in the second phase. The agggregation evaluation
+ * aggregates in the second phase. The aggregation evaluation
* functions are as follows: Partitioning Key: distinct key
*
* Sorting Key: distinct key
@@ -4794,7 +4810,7 @@ public class SemanticAnalyzer extends Ba
* compute partial aggregates grouped by the reduction key (grouping
* key + distinct key). Evaluate partial aggregates first, and spray
* by the grouping key to compute actual aggregates in the second
- * phase. The agggregation evaluation functions are as follows:
+ * phase. The aggregation evaluation functions are as follows:
* Partitioning Key: random() if no DISTINCT grouping + distinct key
* if DISTINCT
*
@@ -4968,7 +4984,7 @@ public class SemanticAnalyzer extends Ba
* spray by the group by key, and sort by the distinct key (if any), and
* compute aggregates based on actual aggregates
*
- * The agggregation evaluation functions are as follows:
+ * The aggregation evaluation functions are as follows:
*
* No grouping sets:
* Group By Operator:
@@ -5135,7 +5151,7 @@ public class SemanticAnalyzer extends Ba
* key). Evaluate partial aggregates first, and spray by the grouping key to
* compute actual aggregates in the second phase.
*
- * The agggregation evaluation functions are as follows:
+ * The aggregation evaluation functions are as follows:
*
* No grouping sets:
* STAGE 1
@@ -5157,7 +5173,7 @@ public class SemanticAnalyzer extends Ba
* Sorting Key: grouping key
* Reducer: merge/terminate (mode = FINAL)
*
- * In the presence of grouping sets, the agggregation evaluation functions are as follows:
+ * In the presence of grouping sets, the aggregation evaluation functions are as follows:
* STAGE 1
* Group by Operator:
* grouping keys: group by expressions + grouping id. if no DISTINCT
@@ -5406,8 +5422,8 @@ public class SemanticAnalyzer extends Ba
if ((dest_tab.getNumBuckets() > 0) &&
(conf.getBoolVar(HiveConf.ConfVars.HIVEENFORCEBUCKETING))) {
enforceBucketing = true;
- partnCols = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
- partnColsNoConvert = getParitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
+ partnCols = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input, true);
+ partnColsNoConvert = getPartitionColsFromBucketCols(dest, qb, dest_tab, table_desc, input,
false);
}
@@ -6205,7 +6221,7 @@ public class SemanticAnalyzer extends Ba
return genLimitPlan(dest, qb, curr, limit);
}
- private ArrayList<ExprNodeDesc> getParitionColsFromBucketCols(String dest, QB qb, Table tab,
+ private ArrayList<ExprNodeDesc> getPartitionColsFromBucketCols(String dest, QB qb, Table tab,
TableDesc table_desc, Operator input, boolean convert)
throws SemanticException {
List<String> tabBucketCols = tab.getBucketCols();
@@ -6384,7 +6400,7 @@ public class SemanticAnalyzer extends Ba
}
@SuppressWarnings("nls")
- private Operator genReduceSinkPlan(String dest, QB qb, Operator input,
+ private Operator genReduceSinkPlan(String dest, QB qb, Operator<?> input,
int numReducers) throws SemanticException {
RowResolver inputRR = opParseCtx.get(input).getRowResolver();
@@ -6424,8 +6440,11 @@ public class SemanticAnalyzer extends Ba
}
}
}
+ Operator dummy = Operator.createDummy();
+ dummy.setParentOperators(Arrays.asList(input));
ArrayList<ExprNodeDesc> sortCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> sortColsBack = new ArrayList<ExprNodeDesc>();
StringBuilder order = new StringBuilder();
if (sortExprs != null) {
int ccount = sortExprs.getChildCount();
@@ -6446,58 +6465,121 @@ public class SemanticAnalyzer extends Ba
}
ExprNodeDesc exprNode = genExprNodeDesc(cl, inputRR);
sortCols.add(exprNode);
+ sortColsBack.add(ExprNodeDescUtils.backtrack(exprNode, dummy, input));
}
}
-
// For the generation of the values expression just get the inputs
// signature and generate field expressions for those
+ RowResolver rsRR = new RowResolver();
ArrayList<String> outputColumns = new ArrayList<String>();
- Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
ArrayList<ExprNodeDesc> valueCols = new ArrayList<ExprNodeDesc>();
- int i = 0;
- for (ColumnInfo colInfo : inputRR.getColumnInfos()) {
- String internalName = getColumnInternalName(i++);
- outputColumns.add(internalName);
- valueCols.add(new ExprNodeColumnDesc(colInfo.getType(), colInfo
- .getInternalName(), colInfo.getTabAlias(), colInfo
- .getIsVirtualCol()));
- colExprMap.put(internalName, valueCols.get(valueCols.size() - 1));
- }
+ ArrayList<ExprNodeDesc> valueColsBack = new ArrayList<ExprNodeDesc>();
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
- Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(PlanUtils
- .getReduceSinkDesc(sortCols, valueCols, outputColumns, false, -1,
- partitionCols, order.toString(), numReducers),
- new RowSchema(inputRR.getColumnInfos()), input), inputRR);
- interim.setColumnExprMap(colExprMap);
+ ArrayList<ColumnInfo> columnInfos = inputRR.getColumnInfos();
- // Add the extract operator to get the value fields
- RowResolver out_rwsch = new RowResolver();
- RowResolver interim_rwsch = inputRR;
- Integer pos = Integer.valueOf(0);
- for (ColumnInfo colInfo : interim_rwsch.getColumnInfos()) {
- String[] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
- out_rwsch.put(info[0], info[1], new ColumnInfo(
- getColumnInternalName(pos), colInfo.getType(), info[0],
- colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol()));
- pos = Integer.valueOf(pos.intValue() + 1);
+ int[] index = new int[columnInfos.size()];
+ for (int i = 0; i < index.length; i++) {
+ ColumnInfo colInfo = columnInfos.get(i);
+ String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
+ String[] nm2 = inputRR.getAlternateMappings(colInfo.getInternalName());
+ ExprNodeColumnDesc value = new ExprNodeColumnDesc(colInfo.getType(),
+ colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol());
+
+ // backtrack can be null when input is script operator
+ ExprNodeDesc valueBack = ExprNodeDescUtils.backtrack(value, dummy, input);
+ int kindex = valueBack == null ? -1 : ExprNodeDescUtils.indexOf(valueBack, sortColsBack);
+ if (kindex >= 0) {
+ index[i] = kindex;
+ ColumnInfo newColInfo = new ColumnInfo(colInfo);
+ newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex);
+ newColInfo.setTabAlias(nm[0]);
+ rsRR.addMappingOnly(nm[0], nm[1], newColInfo);
+ if (nm2 != null) {
+ rsRR.addMappingOnly(nm2[0], nm2[1], newColInfo);
+ }
+ continue;
+ }
+ int vindex = valueBack == null ? -1 : ExprNodeDescUtils.indexOf(valueBack, valueColsBack);
+ if (vindex >= 0) {
+ index[i] = -vindex - 1;
+ continue;
+ }
+ index[i] = -valueCols.size() - 1;
+ String outputColName = getColumnInternalName(valueCols.size());
+
+ valueCols.add(value);
+ valueColsBack.add(valueBack);
+
+ ColumnInfo newColInfo = new ColumnInfo(colInfo);
+ newColInfo.setInternalName(Utilities.ReduceField.VALUE + "." + outputColName);
+ newColInfo.setTabAlias(nm[0]);
+
+ rsRR.put(nm[0], nm[1], newColInfo);
+ if (nm2 != null) {
+ rsRR.addMappingOnly(nm2[0], nm2[1], newColInfo);
+ }
+ outputColumns.add(outputColName);
}
- Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
- new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
- Utilities.ReduceField.VALUE.toString(), "", false)), new RowSchema(
- out_rwsch.getColumnInfos()), interim), out_rwsch);
+ dummy.setParentOperators(null);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created ReduceSink Plan for clause: " + dest + " row schema: "
- + out_rwsch.toString());
+ ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(sortCols, valueCols, outputColumns,
+ false, -1, partitionCols, order.toString(), numReducers);
+ Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc,
+ new RowSchema(rsRR.getColumnInfos()), input), rsRR);
+
+ List<String> keyColNames = rsdesc.getOutputKeyColumnNames();
+ for (int i = 0 ; i < keyColNames.size(); i++) {
+ colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), sortCols.get(i));
+ }
+ List<String> valueColNames = rsdesc.getOutputValueColumnNames();
+ for (int i = 0 ; i < valueColNames.size(); i++) {
+ colExprMap.put(Utilities.ReduceField.VALUE + "." + valueColNames.get(i), valueCols.get(i));
}
+ interim.setColumnExprMap(colExprMap);
+
+ RowResolver selectRR = new RowResolver();
+ ArrayList<ExprNodeDesc> selCols = new ArrayList<ExprNodeDesc>();
+ ArrayList<String> selOutputCols = new ArrayList<String>();
+ Map<String, ExprNodeDesc> selColExprMap = new HashMap<String, ExprNodeDesc>();
+
+ for (int i = 0; i < index.length; i++) {
+ ColumnInfo prev = columnInfos.get(i);
+ String[] nm = inputRR.reverseLookup(prev.getInternalName());
+ String[] nm2 = inputRR.getAlternateMappings(prev.getInternalName());
+ ColumnInfo info = new ColumnInfo(prev);
+
+ String field;
+ if (index[i] >= 0) {
+ field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]);
+ } else {
+ field = Utilities.ReduceField.VALUE + "." + valueColNames.get(-index[i] - 1);
+ }
+ String internalName = getColumnInternalName(i);
+ ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(),
+ field, info.getTabAlias(), info.getIsVirtualCol());
+ selCols.add(desc);
+
+ info.setInternalName(internalName);
+ selectRR.put(nm[0], nm[1], info);
+ if (nm2 != null) {
+ selectRR.addMappingOnly(nm2[0], nm2[1], info);
+ }
+ selOutputCols.add(internalName);
+ selColExprMap.put(internalName, desc);
+ }
+ SelectDesc select = new SelectDesc(selCols, selOutputCols);
+ Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(select,
+ new RowSchema(selectRR.getColumnInfos()), interim), selectRR);
+ output.setColumnExprMap(selColExprMap);
return output;
}
private Operator genJoinOperatorChildren(QBJoinTree join, Operator left,
Operator[] right, HashSet<Integer> omitOpts) throws SemanticException {
- RowResolver outputRS = new RowResolver();
+ RowResolver outputRR = new RowResolver();
ArrayList<String> outputColumnNames = new ArrayList<String>();
// all children are base classes
Operator<?>[] rightOps = new Operator[right.length];
@@ -6511,56 +6593,73 @@ public class SemanticAnalyzer extends Ba
new HashMap<Byte, List<ExprNodeDesc>>();
for (int pos = 0; pos < right.length; ++pos) {
-
- Operator input = right[pos];
+ Operator<?> input = right[pos] == null ? left : right[pos];
if (input == null) {
input = left;
}
+ ReduceSinkOperator rs = (ReduceSinkOperator) input;
+ if (rs.getNumParent() != 1) {
+ throw new SemanticException("RS should have single parent");
+ }
+ Operator<?> parent = rs.getParentOperators().get(0);
+ ReduceSinkDesc rsDesc = (ReduceSinkDesc) (input.getConf());
- ArrayList<ExprNodeDesc> keyDesc = new ArrayList<ExprNodeDesc>();
+ int[] index = rs.getValueIndex();
+
+ ArrayList<ExprNodeDesc> valueDesc = new ArrayList<ExprNodeDesc>();
ArrayList<ExprNodeDesc> filterDesc = new ArrayList<ExprNodeDesc>();
- Byte tag = Byte.valueOf((byte) (((ReduceSinkDesc) (input.getConf()))
- .getTag()));
+ Byte tag = (byte) rsDesc.getTag();
// check whether this input operator produces output
- if (omitOpts == null || !omitOpts.contains(pos)) {
- // prepare output descriptors for the input opt
- RowResolver inputRS = opParseCtx.get(input).getRowResolver();
- Iterator<String> keysIter = inputRS.getTableNames().iterator();
- Set<String> aliases = posToAliasMap.get(pos);
- if (aliases == null) {
- aliases = new HashSet<String>();
- posToAliasMap.put(pos, aliases);
- }
- while (keysIter.hasNext()) {
- String key = keysIter.next();
- aliases.add(key);
- HashMap<String, ColumnInfo> map = inputRS.getFieldMap(key);
- Iterator<String> fNamesIter = map.keySet().iterator();
- while (fNamesIter.hasNext()) {
- String field = fNamesIter.next();
- ColumnInfo valueInfo = inputRS.get(key, field);
- keyDesc.add(new ExprNodeColumnDesc(valueInfo.getType(), valueInfo
- .getInternalName(), valueInfo.getTabAlias(), valueInfo
- .getIsVirtualCol()));
+ if (omitOpts != null && omitOpts.contains(pos)) {
+ exprMap.put(tag, valueDesc);
+ filterMap.put(tag, filterDesc);
+ rightOps[pos] = input;
+ continue;
+ }
- if (outputRS.get(key, field) == null) {
- String colName = getColumnInternalName(outputPos);
- outputPos++;
- outputColumnNames.add(colName);
- colExprMap.put(colName, keyDesc.get(keyDesc.size() - 1));
- outputRS.put(key, field, new ColumnInfo(colName, valueInfo
- .getType(), key, valueInfo.getIsVirtualCol(), valueInfo
- .isHiddenVirtualCol()));
- reversedExprs.put(colName, tag);
- }
- }
+ List<String> keyColNames = rsDesc.getOutputKeyColumnNames();
+ List<String> valColNames = rsDesc.getOutputValueColumnNames();
+
+ // prepare output descriptors for the input opt
+ RowResolver inputRR = opParseCtx.get(input).getRowResolver();
+ RowResolver parentRR = opParseCtx.get(parent).getRowResolver();
+ posToAliasMap.put(pos, new HashSet<String>(inputRR.getTableNames()));
+
+ List<ColumnInfo> columns = parentRR.getColumnInfos();
+ for (int i = 0; i < index.length; i++) {
+ ColumnInfo prev = columns.get(i);
+ String[] nm = parentRR.reverseLookup(prev.getInternalName());
+ String[] nm2 = parentRR.getAlternateMappings(prev.getInternalName());
+ if (outputRR.get(nm[0], nm[1]) != null) {
+ continue;
+ }
+ ColumnInfo info = new ColumnInfo(prev);
+ String field;
+ if (index[i] >= 0) {
+ field = Utilities.ReduceField.KEY + "." + keyColNames.get(index[i]);
+ } else {
+ field = Utilities.ReduceField.VALUE + "." + valColNames.get(-index[i] - 1);
}
- for (ASTNode cond : join.getFilters().get(tag)) {
- filterDesc.add(genExprNodeDesc(cond, inputRS));
+ String internalName = getColumnInternalName(outputColumnNames.size());
+ ExprNodeColumnDesc desc = new ExprNodeColumnDesc(info.getType(),
+ field, info.getTabAlias(), info.getIsVirtualCol());
+
+ info.setInternalName(internalName);
+ colExprMap.put(internalName, desc);
+ outputRR.put(nm[0], nm[1], info);
+ if (nm2 != null) {
+ outputRR.addMappingOnly(nm2[0], nm2[1], info);
}
+
+ valueDesc.add(desc);
+ outputColumnNames.add(internalName);
+ reversedExprs.put(internalName, tag);
}
- exprMap.put(tag, keyDesc);
+ for (ASTNode cond : join.getFilters().get(tag)) {
+ filterDesc.add(genExprNodeDesc(cond, inputRR));
+ }
+ exprMap.put(tag, valueDesc);
filterMap.put(tag, filterDesc);
rightOps[pos] = input;
}
@@ -6577,7 +6676,7 @@ public class SemanticAnalyzer extends Ba
desc.setFilterMap(join.getFilterMap());
JoinOperator joinOp = (JoinOperator) OperatorFactory.getAndMakeChild(desc,
- new RowSchema(outputRS.getColumnInfos()), rightOps);
+ new RowSchema(outputRR.getColumnInfos()), rightOps);
joinOp.setColumnExprMap(colExprMap);
joinOp.setPosToAliasMap(posToAliasMap);
@@ -6588,51 +6687,93 @@ public class SemanticAnalyzer extends Ba
}
desc.setNullSafes(nullsafes);
}
- return putOpInsertMap(joinOp, outputRS);
+ return putOpInsertMap(joinOp, outputRR);
+ }
+
+ private ExprNodeDesc[][] genJoinKeys(QBJoinTree joinTree, Operator[] inputs)
+ throws SemanticException {
+ ExprNodeDesc[][] joinKeys = new ExprNodeDesc[inputs.length][];
+ for (int i = 0; i < inputs.length; i++) {
+ RowResolver inputRR = opParseCtx.get(inputs[i]).getRowResolver();
+ List<ASTNode> expressions = joinTree.getExpressions().get(i);
+ joinKeys[i] = new ExprNodeDesc[expressions.size()];
+ for (int j = 0; j < joinKeys[i].length; j++) {
+ joinKeys[i][j] = genExprNodeDesc(expressions.get(j), inputRR);
+ }
+ }
+ // Type checking and implicit type conversion for join keys
+ return genJoinOperatorTypeCheck(joinKeys);
}
@SuppressWarnings("nls")
- private Operator genJoinReduceSinkChild(QB qb, QBJoinTree joinTree,
- Operator child, String[] srcs, int pos) throws SemanticException {
- RowResolver inputRS = opParseCtx.get(child).getRowResolver();
- RowResolver outputRS = new RowResolver();
+ private Operator genJoinReduceSinkChild(QB qb, ExprNodeDesc[] joinKeys,
+ Operator<?> child, String[] srcs, int tag) throws SemanticException {
+
+ Operator dummy = Operator.createDummy(); // dummy for backtracking
+ dummy.setParentOperators(Arrays.asList(child));
+
+ RowResolver inputRR = opParseCtx.get(child).getRowResolver();
+ RowResolver outputRR = new RowResolver();
ArrayList<String> outputColumns = new ArrayList<String>();
ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
+ ArrayList<ExprNodeDesc> reduceKeysBack = new ArrayList<ExprNodeDesc>();
// Compute join keys and store in reduceKeys
- ArrayList<ASTNode> exprs = joinTree.getExpressions().get(pos);
- for (int i = 0; i < exprs.size(); i++) {
- ASTNode expr = exprs.get(i);
- reduceKeys.add(genExprNodeDesc(expr, inputRS));
+ for (ExprNodeDesc joinKey : joinKeys) {
+ reduceKeys.add(joinKey);
+ reduceKeysBack.add(ExprNodeDescUtils.backtrack(joinKey, dummy, child));
}
// Walk over the input row resolver and copy in the output
ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
- Iterator<String> tblNamesIter = inputRS.getTableNames().iterator();
+ ArrayList<ExprNodeDesc> reduceValuesBack = new ArrayList<ExprNodeDesc>();
Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
- while (tblNamesIter.hasNext()) {
- String src = tblNamesIter.next();
- HashMap<String, ColumnInfo> fMap = inputRS.getFieldMap(src);
- for (Map.Entry<String, ColumnInfo> entry : fMap.entrySet()) {
- String field = entry.getKey();
- ColumnInfo valueInfo = entry.getValue();
- ExprNodeColumnDesc inputExpr = new ExprNodeColumnDesc(valueInfo
- .getType(), valueInfo.getInternalName(), valueInfo.getTabAlias(),
- valueInfo.getIsVirtualCol());
- reduceValues.add(inputExpr);
- if (outputRS.get(src, field) == null) {
- String col = getColumnInternalName(reduceValues.size() - 1);
- outputColumns.add(col);
- ColumnInfo newColInfo = new ColumnInfo(Utilities.ReduceField.VALUE
- .toString()
- + "." + col, valueInfo.getType(), src, valueInfo
- .getIsVirtualCol(), valueInfo.isHiddenVirtualCol());
- colExprMap.put(newColInfo.getInternalName(), inputExpr);
- outputRS.put(src, field, newColInfo);
+ List<ColumnInfo> columns = inputRR.getColumnInfos();
+ int[] index = new int[columns.size()];
+ for (int i = 0; i < columns.size(); i++) {
+ ColumnInfo colInfo = columns.get(i);
+ String[] nm = inputRR.reverseLookup(colInfo.getInternalName());
+ String[] nm2 = inputRR.getAlternateMappings(colInfo.getInternalName());
+ ExprNodeDesc expr = new ExprNodeColumnDesc(colInfo.getType(),
+ colInfo.getInternalName(), colInfo.getTabAlias(), colInfo.getIsVirtualCol());
+
+ // backtrack can be null when input is script operator
+ ExprNodeDesc exprBack = ExprNodeDescUtils.backtrack(expr, dummy, child);
+ int kindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceKeysBack);
+ if (kindex >= 0) {
+ ColumnInfo newColInfo = new ColumnInfo(colInfo);
+ newColInfo.setInternalName(Utilities.ReduceField.KEY + ".reducesinkkey" + kindex);
+ newColInfo.setTabAlias(nm[0]);
+ outputRR.addMappingOnly(nm[0], nm[1], newColInfo);
+ if (nm2 != null) {
+ outputRR.addMappingOnly(nm2[0], nm2[1], newColInfo);
}
+ index[i] = kindex;
+ continue;
}
+ int vindex = exprBack == null ? -1 : ExprNodeDescUtils.indexOf(exprBack, reduceValuesBack);
+ if (kindex >= 0) {
+ index[i] = -vindex - 1;
+ continue;
+ }
+ index[i] = -reduceValues.size() - 1;
+ String outputColName = getColumnInternalName(reduceValues.size());
+
+ reduceValues.add(expr);
+ reduceValuesBack.add(exprBack);
+
+ ColumnInfo newColInfo = new ColumnInfo(colInfo);
+ newColInfo.setInternalName(Utilities.ReduceField.VALUE + "." + outputColName);
+ newColInfo.setTabAlias(nm[0]);
+
+ outputRR.put(nm[0], nm[1], newColInfo);
+ if (nm2 != null) {
+ outputRR.addMappingOnly(nm2[0], nm2[1], newColInfo);
+ }
+ outputColumns.add(outputColName);
}
+ dummy.setParentOperators(null);
int numReds = -1;
@@ -6647,11 +6788,23 @@ public class SemanticAnalyzer extends Ba
}
}
+ ReduceSinkDesc rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys,
+ reduceValues, outputColumns, false, tag,
+ reduceKeys.size(), numReds);
+
ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
- OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
- reduceValues, outputColumns, false, joinTree.getNextTag(),
- reduceKeys.size(), numReds), new RowSchema(outputRS
- .getColumnInfos()), child), outputRS);
+ OperatorFactory.getAndMakeChild(rsDesc, new RowSchema(outputRR
+ .getColumnInfos()), child), outputRR);
+ List<String> keyColNames = rsDesc.getOutputKeyColumnNames();
+ for (int i = 0 ; i < keyColNames.size(); i++) {
+ colExprMap.put(Utilities.ReduceField.KEY + "." + keyColNames.get(i), reduceKeys.get(i));
+ }
+ List<String> valColNames = rsDesc.getOutputValueColumnNames();
+ for (int i = 0 ; i < valColNames.size(); i++) {
+ colExprMap.put(Utilities.ReduceField.VALUE + "." + valColNames.get(i), reduceValues.get(i));
+ }
+
+ rsOp.setValueIndex(index);
rsOp.setColumnExprMap(colExprMap);
rsOp.setInputAliases(srcs);
return rsOp;
@@ -6672,16 +6825,15 @@ public class SemanticAnalyzer extends Ba
for (ASTNode cond : filter) {
joinSrcOp = genFilterPlan(qb, cond, joinSrcOp);
}
- String[] leftAliases = joinTree.getLeftAliases();
- joinSrcOp = genJoinReduceSinkChild(qb, joinTree, joinSrcOp, leftAliases, 0);
}
- Operator[] srcOps = new Operator[joinTree.getBaseSrc().length];
+ String[] baseSrc = joinTree.getBaseSrc();
+ Operator[] srcOps = new Operator[baseSrc.length];
HashSet<Integer> omitOpts = null; // set of input to the join that should be
// omitted by the output
int pos = 0;
- for (String src : joinTree.getBaseSrc()) {
+ for (String src : baseSrc) {
if (src != null) {
Operator srcOp = map.get(src.toLowerCase());
@@ -6700,21 +6852,24 @@ public class SemanticAnalyzer extends Ba
// generate a groupby operator (HASH mode) for a map-side partial
// aggregation for semijoin
- srcOp = genMapGroupByForSemijoin(qb, fields, srcOp,
+ srcOps[pos++] = genMapGroupByForSemijoin(qb, fields, srcOp,
GroupByDesc.Mode.HASH);
+ } else {
+ srcOps[pos++] = srcOp;
}
-
- // generate a ReduceSink operator for the join
- srcOps[pos] = genJoinReduceSinkChild(qb, joinTree, srcOp, new String[]{src}, pos);
- pos++;
} else {
assert pos == 0;
- srcOps[pos++] = null;
+ srcOps[pos++] = joinSrcOp;
}
}
- // Type checking and implicit type conversion for join keys
- genJoinOperatorTypeCheck(joinSrcOp, srcOps);
+ ExprNodeDesc[][] joinKeys = genJoinKeys(joinTree, srcOps);
+
+ for (int i = 0; i < srcOps.length; i++) {
+ // generate a ReduceSink operator for the join
+ String[] srcs = baseSrc[i] != null ? new String[] {baseSrc[i]} : joinTree.getLeftAliases();
+ srcOps[i] = genJoinReduceSinkChild(qb, joinKeys[i], srcOps[i], srcs, joinTree.getNextTag());
+ }
JoinOperator joinOp = (JoinOperator) genJoinOperatorChildren(joinTree,
joinSrcOp, srcOps, omitOpts);
@@ -6745,12 +6900,14 @@ public class SemanticAnalyzer extends Ba
ArrayList<ExprNodeDesc> colList = new ArrayList<ExprNodeDesc>();
ArrayList<String> columnNames = new ArrayList<String>();
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
// construct the list of columns that need to be projected
for (ASTNode field : fields) {
ExprNodeColumnDesc exprNode = (ExprNodeColumnDesc) genExprNodeDesc(field,
inputRR);
colList.add(exprNode);
columnNames.add(exprNode.getColumn());
+ colExprMap.put(exprNode.getColumn(), exprNode);
}
// create selection operator
@@ -6758,7 +6915,7 @@ public class SemanticAnalyzer extends Ba
new SelectDesc(colList, columnNames, false), new RowSchema(inputRR
.getColumnInfos()), input), inputRR);
- output.setColumnExprMap(input.getColumnExprMap());
+ output.setColumnExprMap(colExprMap);
return output;
}
@@ -6817,28 +6974,24 @@ public class SemanticAnalyzer extends Ba
return op;
}
- private void genJoinOperatorTypeCheck(Operator left, Operator[] right)
+ private ExprNodeDesc[][] genJoinOperatorTypeCheck(ExprNodeDesc[][] keys)
throws SemanticException {
// keys[i] -> ArrayList<exprNodeDesc> for the i-th join operator key list
- ArrayList<ArrayList<ExprNodeDesc>> keys = new ArrayList<ArrayList<ExprNodeDesc>>();
int keyLength = 0;
- for (int i = 0; i < right.length; i++) {
- Operator oi = (i == 0 && right[i] == null ? left : right[i]);
- ReduceSinkDesc now = ((ReduceSinkOperator) (oi)).getConf();
+ for (int i = 0; i < keys.length; i++) {
if (i == 0) {
- keyLength = now.getKeyCols().size();
+ keyLength = keys[i].length;
} else {
- assert (keyLength == now.getKeyCols().size());
+ assert keyLength == keys[i].length;
}
- keys.add(now.getKeyCols());
}
// implicit type conversion hierarchy
for (int k = 0; k < keyLength; k++) {
// Find the common class for type conversion
- TypeInfo commonType = keys.get(0).get(k).getTypeInfo();
- for (int i = 1; i < right.length; i++) {
+ TypeInfo commonType = keys[0][k].getTypeInfo();
+ for (int i = 1; i < keys.length; i++) {
TypeInfo a = commonType;
- TypeInfo b = keys.get(i).get(k).getTypeInfo();
+ TypeInfo b = keys[i][k].getTypeInfo();
commonType = FunctionRegistry.getCommonClassForComparison(a, b);
if (commonType == null) {
throw new SemanticException(
@@ -6847,27 +7000,15 @@ public class SemanticAnalyzer extends Ba
}
}
// Add implicit type conversion if necessary
- for (int i = 0; i < right.length; i++) {
+ for (int i = 0; i < keys.length; i++) {
if (TypeInfoUtils.isConversionRequiredForComparison(
- keys.get(i).get(k).getTypeInfo(),
- commonType)) {
- keys.get(i).set(
- k,
- ParseUtils.createConversionCast(
- keys.get(i).get(k), (PrimitiveTypeInfo)commonType));
+ keys[i][k].getTypeInfo(), commonType)) {
+ keys[i][k] = ParseUtils.createConversionCast(
+ keys[i][k], (PrimitiveTypeInfo)commonType);
}
}
}
- // regenerate keySerializationInfo because the ReduceSinkOperator's
- // output key types might have changed.
- for (int i = 0; i < right.length; i++) {
- Operator oi = (i == 0 && right[i] == null ? left : right[i]);
- ReduceSinkDesc now = ((ReduceSinkOperator) (oi)).getConf();
-
- now.setKeySerializeInfo(PlanUtils.getReduceKeyTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(now.getKeyCols(), "joinkey"), now
- .getOrder()));
- }
+ return keys;
}
private Operator genJoinPlan(QB qb, Map<String, Operator> map)
@@ -9272,7 +9413,7 @@ public class SemanticAnalyzer extends Ba
optm.setPctx(pCtx);
optm.initialize(conf);
pCtx = optm.optimize();
-
+
FetchTask origFetchTask = pCtx.getFetchTask();
if (LOG.isDebugEnabled()) {
@@ -9301,7 +9442,7 @@ public class SemanticAnalyzer extends Ba
// if desired check we're not going over partition scan limits
enforceScanLimits(pCtx, origFetchTask);
}
-
+
return;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDescUtils.java Thu Jun 5 18:28:07 2014
@@ -74,7 +74,7 @@ public class ExprNodeDescUtils {
}
children.add(child);
}
- // duplicate function with possibily replaced children
+ // duplicate function with possibly replaced children
ExprNodeGenericFuncDesc clone = (ExprNodeGenericFuncDesc) func.clone();
clone.setChildren(children);
return clone;
@@ -164,7 +164,7 @@ public class ExprNodeDescUtils {
}
/**
- * Return false if the expression has any non determinitic function
+ * Return false if the expression has any non deterministic function
*/
public static boolean isDeterministic(ExprNodeDesc desc) {
if (desc instanceof ExprNodeGenericFuncDesc) {
@@ -182,6 +182,14 @@ public class ExprNodeDescUtils {
return true;
}
+ public static ArrayList<ExprNodeDesc> clone(List<ExprNodeDesc> sources) {
+ ArrayList<ExprNodeDesc> result = new ArrayList<ExprNodeDesc>();
+ for (ExprNodeDesc expr : sources) {
+ result.add(expr.clone());
+ }
+ return result;
+ }
+
/**
* Convert expressions in current operator to those in terminal operator, which
* is an ancestor of current or null (back to top operator).
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java Thu Jun 5 18:28:07 2014
@@ -43,6 +43,7 @@ public class MapJoinDesc extends JoinDes
private int posBigTable;
+ private Map<Byte, int[]> valueIndices;
private Map<Byte, List<Integer>> retainList;
private transient String bigTableAlias;
@@ -77,6 +78,7 @@ public class MapJoinDesc extends JoinDes
this.keyTblDesc = clone.keyTblDesc;
this.valueTblDescs = clone.valueTblDescs;
this.posBigTable = clone.posBigTable;
+ this.valueIndices = clone.valueIndices;
this.retainList = clone.retainList;
this.bigTableAlias = clone.bigTableAlias;
this.aliasBucketFileNameMapping = clone.aliasBucketFileNameMapping;
@@ -125,6 +127,18 @@ public class MapJoinDesc extends JoinDes
this.parentToInput = parentToInput;
}
+ public Map<Byte, int[]> getValueIndices() {
+ return valueIndices;
+ }
+
+ public void setValueIndices(Map<Byte, int[]> valueIndices) {
+ this.valueIndices = valueIndices;
+ }
+
+ public int[] getValueIndex(byte alias) {
+ return valueIndices == null ? null : valueIndices.get(alias);
+ }
+
public Map<Byte, List<Integer>> getRetainList() {
return retainList;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/ppd/ExprWalkerProcFactory.java Thu Jun 5 18:28:07 2014
@@ -87,6 +87,10 @@ public final class ExprWalkerProcFactory
if (exp instanceof ExprNodeGenericFuncDesc) {
isCandidate = false;
}
+ if (exp instanceof ExprNodeColumnDesc && colAlias == null) {
+ ExprNodeColumnDesc column = (ExprNodeColumnDesc)exp;
+ colAlias = new String[]{column.getTabAlias(), column.getColumn()};
+ }
}
ctx.addConvertedNode(colref, exp);
ctx.setIsCandidate(exp, isCandidate);
Modified: hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin6.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin6.q?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin6.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/bucketmapjoin6.q Thu Jun 5 18:28:07 2014
@@ -1,6 +1,7 @@
+set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
+
-- SORT_QUERY_RESULTS
-set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
create table tmp1 (a string, b string) clustered by (a) sorted by (a) into 10 buckets;
create table tmp2 (a string, b string) clustered by (a) sorted by (a) into 10 buckets;
Modified: hive/trunk/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out (original)
+++ hive/trunk/ql/src/test/results/clientnegative/bucket_mapjoin_mismatch1.q.out Thu Jun 5 18:28:07 2014
@@ -117,14 +117,14 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: key (type: int)
Statistics: Num rows: 40 Data size: 4200 Basic stats: COMPLETE Column stats: NONE
- value expressions: key (type: int), value (type: string)
+ value expressions: value (type: string)
Reduce Operator Tree:
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
- 0 {VALUE._col0} {VALUE._col1}
- 1 {VALUE._col1}
+ 0 {KEY.reducesinkkey0} {VALUE._col0}
+ 1 {VALUE._col0}
outputColumnNames: _col0, _col1, _col6
Statistics: Num rows: 44 Data size: 4620 Basic stats: COMPLETE Column stats: NONE
Select Operator
Modified: hive/trunk/ql/src/test/results/clientpositive/ambiguous_col.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/ambiguous_col.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/ambiguous_col.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/ambiguous_col.q.out Thu Jun 5 18:28:07 2014
@@ -24,7 +24,7 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 29 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col0 (type: string), _col1 (type: string)
+ value expressions: _col1 (type: string)
TableScan
alias: src1
Statistics: Num rows: 2 Data size: 216 Basic stats: COMPLETE Column stats: NONE
@@ -42,7 +42,7 @@ STAGE PLANS:
condition map:
Inner Join 0 to 1
condition expressions:
- 0 {VALUE._col0} {VALUE._col1}
+ 0 {KEY.reducesinkkey0} {VALUE._col0}
1
outputColumnNames: _col0, _col1
Statistics: Num rows: 31 Data size: 6393 Basic stats: COMPLETE Column stats: NONE
@@ -90,7 +90,6 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col0 (type: string)
TableScan
alias: src1
Statistics: Num rows: 2 Data size: 216 Basic stats: COMPLETE Column stats: NONE
@@ -108,7 +107,7 @@ STAGE PLANS:
condition map:
Inner Join 0 to 1
condition expressions:
- 0 {VALUE._col0}
+ 0 {KEY.reducesinkkey0}
1
outputColumnNames: _col0
Statistics: Num rows: 63 Data size: 6393 Basic stats: COMPLETE Column stats: NONE
@@ -156,7 +155,6 @@ STAGE PLANS:
sort order: +
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 58 Data size: 5812 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col0 (type: string)
TableScan
alias: src1
Statistics: Num rows: 2 Data size: 216 Basic stats: COMPLETE Column stats: NONE
@@ -174,7 +172,7 @@ STAGE PLANS:
condition map:
Inner Join 0 to 1
condition expressions:
- 0 {VALUE._col0}
+ 0 {KEY.reducesinkkey0}
1
outputColumnNames: _col0
Statistics: Num rows: 63 Data size: 6393 Basic stats: COMPLETE Column stats: NONE
Modified: hive/trunk/ql/src/test/results/clientpositive/annotate_stats_join.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/annotate_stats_join.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/annotate_stats_join.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/annotate_stats_join.q.out Thu Jun 5 18:28:07 2014 differ
Modified: hive/trunk/ql/src/test/results/clientpositive/annotate_stats_select.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/annotate_stats_select.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
Files hive/trunk/ql/src/test/results/clientpositive/annotate_stats_select.q.out (original) and hive/trunk/ql/src/test/results/clientpositive/annotate_stats_select.q.out Thu Jun 5 18:28:07 2014 differ
Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join0.q.out Thu Jun 5 18:28:07 2014
@@ -86,27 +86,24 @@ STAGE PLANS:
key expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
sort order: ++++
Statistics: Num rows: 9 Data size: 1983 Basic stats: COMPLETE Column stats: NONE
- value expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
Local Work:
Map Reduce Local Work
Reduce Operator Tree:
- Extract
+ Select Operator
+ expressions: KEY.reducesinkkey0 (type: string), KEY.reducesinkkey1 (type: string), KEY.reducesinkkey2 (type: string), KEY.reducesinkkey3 (type: string)
+ outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 9 Data size: 1983 Basic stats: COMPLETE Column stats: NONE
- Select Operator
- expressions: _col0 (type: string), _col1 (type: string), _col2 (type: string), _col3 (type: string)
- outputColumnNames: _col0, _col1, _col2, _col3
- Statistics: Num rows: 9 Data size: 1983 Basic stats: COMPLETE Column stats: NONE
- Group By Operator
- aggregations: sum(hash(_col0,_col1,_col2,_col3))
- mode: hash
- outputColumnNames: _col0
- Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.SequenceFileInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Group By Operator
+ aggregations: sum(hash(_col0,_col1,_col2,_col3))
+ mode: hash
+ outputColumnNames: _col0
+ Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
Stage: Stage-3
Map Reduce
Modified: hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out?rev=1600719&r1=1600718&r2=1600719&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_join13.q.out Thu Jun 5 18:28:07 2014
@@ -67,7 +67,7 @@ STAGE PLANS:
Statistics: Num rows: 19 Data size: 1903 Basic stats: COMPLETE Column stats: NONE
HashTable Sink Operator
condition expressions:
- 0 {_col3} {_col0}
+ 0 {_col0} {_col3}
1
keys:
0 (_col0 + _col2) (type: double)
@@ -101,19 +101,19 @@ STAGE PLANS:
condition map:
Inner Join 0 to 1
condition expressions:
- 0 {_col3} {_col0}
+ 0 {_col0} {_col3}
1
keys:
0 (_col0 + _col2) (type: double)
1 UDFToDouble(_col0) (type: double)
- outputColumnNames: _col1, _col2
+ outputColumnNames: _col0, _col3
Statistics: Num rows: 22 Data size: 2302 Basic stats: COMPLETE Column stats: NONE
Select Operator
- expressions: _col2 (type: string), _col1 (type: string)
- outputColumnNames: _col2, _col1
+ expressions: _col0 (type: string), _col3 (type: string)
+ outputColumnNames: _col0, _col3
Statistics: Num rows: 22 Data size: 2302 Basic stats: COMPLETE Column stats: NONE
Group By Operator
- aggregations: sum(hash(_col2,_col1))
+ aggregations: sum(hash(_col0,_col3))
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE