You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2012/01/03 19:10:36 UTC
svn commit: r1226903 [1/5] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/parse/
ql/src/java/org/apache/hadoop/hive/ql/plan/
ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/
Author: heyongqiang
Date: Tue Jan 3 18:10:34 2012
New Revision: 1226903
URL: http://svn.apache.org/viewvc?rev=1226903&view=rev
Log:
HIVE-2621:Allow multiple group bys with the same input data and spray keys to be run on the same reducer. (Kevin via He Yongqiang)
Added:
hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q
hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q
hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q
hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q
hive/trunk/ql/src/test/results/clientpositive/groupby7_map_multi_single_reducer.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby7_noskew_multi_single_reducer.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_complex_types_multi_single_reducer.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_multi_single_reducer.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java
hive/trunk/ql/src/test/queries/clientpositive/groupby10.q
hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q
hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q
hive/trunk/ql/src/test/queries/clientpositive/groupby8.q
hive/trunk/ql/src/test/queries/clientpositive/groupby9.q
hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q
hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby8.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby9.q.out
hive/trunk/ql/src/test/results/clientpositive/multi_insert.q.out
hive/trunk/ql/src/test/results/clientpositive/multigroupby_singlemr.q.out
hive/trunk/ql/src/test/results/clientpositive/parallel.q.out
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Jan 3 18:10:34 2012
@@ -350,7 +350,7 @@ public class HiveConf extends Configurat
HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory", (float) 0.3),
HIVEMAPAGGRMEMORYTHRESHOLD("hive.map.aggr.hash.force.flush.memory.threshold", (float) 0.9),
HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5),
- HIVEMULTIGROUPBYSINGLEMR("hive.multigroupby.singlemr", false),
+ HIVEMULTIGROUPBYSINGLEREDUCER("hive.multigroupby.singlereducer", true),
// for hive udtf operator
HIVEUDTFAUTOPROGRESS("hive.udtf.auto.progress", false),
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Jan 3 18:10:34 2012
@@ -33,6 +33,7 @@ import java.util.TreeSet;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
+import org.antlr.runtime.tree.Tree;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.Path;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryProperties;
import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
@@ -58,7 +60,6 @@ import org.apache.hadoop.hive.ql.exec.Fu
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapRedTask;
-import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.RecordReader;
@@ -153,6 +154,7 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -170,7 +172,6 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.InputFormat;
-import org.antlr.runtime.tree.Tree;
/**
* Implementation of the semantic analyzer.
@@ -2892,12 +2893,62 @@ public class SemanticAnalyzer extends Ba
RowResolver reduceSinkOutputRowResolver = new RowResolver();
reduceSinkOutputRowResolver.setIsExprResolver(true);
Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
- ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
// Pre-compute group-by keys and store in reduceKeys
List<String> outputKeyColumnNames = new ArrayList<String>();
List<String> outputValueColumnNames = new ArrayList<String>();
List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
+
+ ArrayList<ExprNodeDesc> reduceKeys = getReduceKeysForReduceSink(grpByExprs, dest,
+ reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
+ colExprMap);
+
+ List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
+ reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+
+ ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
+ HashMap<String, ASTNode> aggregationTrees = parseInfo
+ .getAggregationExprsForClause(dest);
+
+ if (!mapAggrDone) {
+ getReduceValuesForReduceSinkNoMapAgg(parseInfo, dest, reduceSinkInputRowResolver,
+ reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues);
+ } else {
+ // Put partial aggregation results in reduceValues
+ int inputField = reduceKeys.size();
+
+ for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
+
+ TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get(
+ inputField).getType();
+ reduceValues.add(new ExprNodeColumnDesc(type,
+ getColumnInternalName(inputField), "", false));
+ inputField++;
+ outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
+ String field = Utilities.ReduceField.VALUE.toString() + "."
+ + getColumnInternalName(reduceValues.size() - 1);
+ reduceSinkOutputRowResolver.putExpression(entry.getValue(),
+ new ColumnInfo(field, type, null, false));
+ }
+ }
+
+ ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
+ OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
+ grpByExprs.size(), reduceValues, distinctColIndices,
+ outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields,
+ numReducers), new RowSchema(reduceSinkOutputRowResolver
+ .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver);
+ rsOp.setColumnExprMap(colExprMap);
+ return rsOp;
+ }
+
+ private ArrayList<ExprNodeDesc> getReduceKeysForReduceSink(List<ASTNode> grpByExprs, String dest,
+ RowResolver reduceSinkInputRowResolver, RowResolver reduceSinkOutputRowResolver,
+ List<String> outputKeyColumnNames, Map<String, ExprNodeDesc> colExprMap)
+ throws SemanticException {
+
+ ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
+
for (int i = 0; i < grpByExprs.size(); ++i) {
ASTNode grpbyExpr = grpByExprs.get(i);
ExprNodeDesc inputExpr = genExprNodeDesc(grpbyExpr,
@@ -2917,7 +2968,16 @@ public class SemanticAnalyzer extends Ba
}
}
+ return reduceKeys;
+ }
+
+ private List<List<Integer>> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo, String dest,
+ ArrayList<ExprNodeDesc> reduceKeys, RowResolver reduceSinkInputRowResolver,
+ RowResolver reduceSinkOutputRowResolver, List<String> outputKeyColumnNames)
+ throws SemanticException {
+
List<List<Integer>> distinctColIndices = new ArrayList<List<Integer>>();
+
// If there is a distinctFuncExp, add all parameters to the reduceKeys.
if (!parseInfo.getDistinctFuncExprsForClause(dest).isEmpty()) {
List<ASTNode> distFuncs = parseInfo.getDistinctFuncExprsForClause(dest);
@@ -2957,17 +3017,80 @@ public class SemanticAnalyzer extends Ba
}
}
- ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
+ return distinctColIndices;
+ }
+
+ private void getReduceValuesForReduceSinkNoMapAgg(QBParseInfo parseInfo, String dest,
+ RowResolver reduceSinkInputRowResolver, RowResolver reduceSinkOutputRowResolver,
+ List<String> outputValueColumnNames, ArrayList<ExprNodeDesc> reduceValues)
+ throws SemanticException {
HashMap<String, ASTNode> aggregationTrees = parseInfo
.getAggregationExprsForClause(dest);
- if (!mapAggrDone) {
- // Put parameters to aggregations in reduceValues
- for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
- ASTNode value = entry.getValue();
- // 0 is function name
- for (int i = 1; i < value.getChildCount(); i++) {
- ASTNode parameter = (ASTNode) value.getChild(i);
+ // Put parameters to aggregations in reduceValues
+ for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
+ ASTNode value = entry.getValue();
+ // 0 is function name
+ for (int i = 1; i < value.getChildCount(); i++) {
+ ASTNode parameter = (ASTNode) value.getChild(i);
+ if (reduceSinkOutputRowResolver.getExpression(parameter) == null) {
+ reduceValues.add(genExprNodeDesc(parameter,
+ reduceSinkInputRowResolver));
+ outputValueColumnNames
+ .add(getColumnInternalName(reduceValues.size() - 1));
+ String field = Utilities.ReduceField.VALUE.toString() + "."
+ + getColumnInternalName(reduceValues.size() - 1);
+ reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field,
+ reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null,
+ false));
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("nls")
+ private Operator genCommonGroupByPlanReduceSinkOperator(QB qb, List<String> dests,
+ Operator inputOperatorInfo) throws SemanticException {
+
+ RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo)
+ .getRowResolver();
+ QBParseInfo parseInfo = qb.getParseInfo();
+ RowResolver reduceSinkOutputRowResolver = new RowResolver();
+ reduceSinkOutputRowResolver.setIsExprResolver(true);
+ Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+
+ // The group by keys and distinct keys should be the same for all dests, so using the first
+ // one to produce these will be the same as using any other.
+ String dest = dests.get(0);
+
+ // Pre-compute group-by keys and store in reduceKeys
+ List<String> outputKeyColumnNames = new ArrayList<String>();
+ List<String> outputValueColumnNames = new ArrayList<String>();
+ List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
+
+ ArrayList<ExprNodeDesc> reduceKeys = getReduceKeysForReduceSink(grpByExprs, dest,
+ reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
+ colExprMap);
+
+ List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
+ reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+
+ ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
+
+ // The dests can have different non-distinct aggregations, so we have to iterate over all of
+ // them
+ for (String destination : dests) {
+
+ getReduceValuesForReduceSinkNoMapAgg(parseInfo, dest, reduceSinkInputRowResolver,
+ reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues);
+
+ // Need to pass all of the columns used in the where clauses as reduce values
+ ASTNode whereClause = parseInfo.getWhrForClause(destination);
+ if (whereClause != null) {
+ List<ASTNode> columnExprs =
+ getColumnExprsFromASTNode(whereClause, reduceSinkInputRowResolver);
+ for (int i = 0; i < columnExprs.size(); i++) {
+ ASTNode parameter = columnExprs.get(i);
if (reduceSinkOutputRowResolver.getExpression(parameter) == null) {
reduceValues.add(genExprNodeDesc(parameter,
reduceSinkInputRowResolver));
@@ -2981,36 +3104,47 @@ public class SemanticAnalyzer extends Ba
}
}
}
- } else {
- // Put partial aggregation results in reduceValues
- int inputField = reduceKeys.size();
-
- for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
-
- TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get(
- inputField).getType();
- reduceValues.add(new ExprNodeColumnDesc(type,
- getColumnInternalName(inputField), "", false));
- inputField++;
- outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
- String field = Utilities.ReduceField.VALUE.toString() + "."
- + getColumnInternalName(reduceValues.size() - 1);
- reduceSinkOutputRowResolver.putExpression(entry.getValue(),
- new ColumnInfo(field, type, null, false));
- }
}
ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
grpByExprs.size(), reduceValues, distinctColIndices,
- outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields,
- numReducers), new RowSchema(reduceSinkOutputRowResolver
+ outputKeyColumnNames, outputValueColumnNames, true, -1, grpByExprs.size(),
+ -1), new RowSchema(reduceSinkOutputRowResolver
.getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver);
rsOp.setColumnExprMap(colExprMap);
return rsOp;
}
/**
+ * Given an ASTNode, it returns all of the descendant ASTNodes which represent column expressions
+ *
+ * @param node
+ * @param inputRR
+ * @return
+ * @throws SemanticException
+ */
+ private List<ASTNode> getColumnExprsFromASTNode(ASTNode node, RowResolver inputRR)
+ throws SemanticException {
+
+ List<ASTNode> nodes = new ArrayList<ASTNode>();
+ if (node.getChildCount() == 0) {
+ return nodes;
+ }
+ for (int i = 0; i < node.getChildCount(); i++) {
+ ASTNode child = (ASTNode)node.getChild(i);
+ if (child.getType() == HiveParser.TOK_TABLE_OR_COL && child.getChild(0) != null &&
+ inputRR.get(null,
+ BaseSemanticAnalyzer.unescapeIdentifier(child.getChild(0).getText())) != null) {
+ nodes.add(child);
+ } else {
+ nodes.addAll(getColumnExprsFromASTNode(child, inputRR));
+ }
+ }
+ return nodes;
+ }
+
+ /**
* Generate the second ReduceSinkOperator for the Group By Plan
* (parseInfo.getXXX(dest)). The new ReduceSinkOperator will be a child of
* groupByOperatorInfo.
@@ -3223,6 +3357,99 @@ public class SemanticAnalyzer extends Ba
return groupByOperatorInfo;
}
+ @SuppressWarnings({"nls"})
+ private Operator genGroupByPlan1MRMultiReduceGB(List<String> dests, QB qb, Operator input)
+ throws SemanticException {
+
+ QBParseInfo parseInfo = qb.getParseInfo();
+
+ ExprNodeDesc previous = null;
+ Operator selectInput = input;
+
+ // In order to facilitate partition pruning, or the where clauses together and put them at the
+ // top of the operator tree, this could also reduce the amount of data going to the reducer
+ List<ExprNodeDesc.ExprNodeDescEqualityWrapper> whereExpressions =
+ new ArrayList<ExprNodeDesc.ExprNodeDescEqualityWrapper>();
+ for (String dest : dests) {
+ ASTNode whereExpr = parseInfo.getWhrForClause(dest);
+
+ if (whereExpr != null) {
+ OpParseContext inputCtx = opParseCtx.get(input);
+ RowResolver inputRR = inputCtx.getRowResolver();
+ ExprNodeDesc current = genExprNodeDesc((ASTNode)whereExpr.getChild(0), inputRR);
+
+ // Check the list of where expressions already added so they aren't duplicated
+ ExprNodeDesc.ExprNodeDescEqualityWrapper currentWrapped =
+ new ExprNodeDesc.ExprNodeDescEqualityWrapper(current);
+ if (!whereExpressions.contains(currentWrapped)) {
+ whereExpressions.add(currentWrapped);
+ } else {
+ continue;
+ }
+
+ if (previous == null) {
+ // If this is the first expression
+ previous = current;
+ continue;
+ }
+
+ GenericUDFOPOr or = new GenericUDFOPOr();
+ List<ExprNodeDesc> expressions = new ArrayList<ExprNodeDesc>(2);
+ expressions.add(previous);
+ expressions.add(current);
+ ExprNodeDesc orExpr =
+ new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, or, expressions);
+ previous = orExpr;
+ } else {
+ // If an expression does not have a where clause, there can be no common filter
+ previous = null;
+ break;
+ }
+ }
+
+ if (previous != null) {
+ OpParseContext inputCtx = opParseCtx.get(input);
+ RowResolver inputRR = inputCtx.getRowResolver();
+ FilterDesc orFilterDesc = new FilterDesc(previous, false);
+
+ selectInput = putOpInsertMap(OperatorFactory.getAndMakeChild(
+ orFilterDesc, new RowSchema(
+ inputRR.getColumnInfos()), input), inputRR);
+ }
+
+ // insert a select operator here used by the ColumnPruner to reduce
+ // the data to shuffle
+ Operator select = insertSelectAllPlanForGroupBy(selectInput);
+
+ // Generate ReduceSinkOperator
+ Operator reduceSinkOperatorInfo = genCommonGroupByPlanReduceSinkOperator(qb, dests, select);
+
+ // It is assumed throughout the code that a reducer has a single child, add a
+ // ForwardOperator so that we can add multiple filter/group by operators as children
+ RowResolver reduceSinkOperatorInfoRR = opParseCtx.get(reduceSinkOperatorInfo).getRowResolver();
+ Operator forwardOp = putOpInsertMap(OperatorFactory.getAndMakeChild(new ForwardDesc(),
+ new RowSchema(reduceSinkOperatorInfoRR.getColumnInfos()), reduceSinkOperatorInfo),
+ reduceSinkOperatorInfoRR);
+
+ Operator curr = forwardOp;
+
+ for (String dest : dests) {
+ curr = forwardOp;
+
+ if (parseInfo.getWhrForClause(dest) != null) {
+ curr = genFilterPlan(dest, qb, forwardOp);
+ }
+
+ // Generate GroupbyOperator
+ Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
+ dest, curr, GroupByDesc.Mode.COMPLETE, null);
+
+ curr = genPostGroupByBodyPlan(groupByOperatorInfo, dest, qb);
+ }
+
+ return curr;
+ }
+
static ArrayList<GenericUDAFEvaluator> getUDAFEvaluators(
ArrayList<AggregationDesc> aggs) {
ArrayList<GenericUDAFEvaluator> result = new ArrayList<GenericUDAFEvaluator>();
@@ -3290,40 +3517,6 @@ public class SemanticAnalyzer extends Ba
}
/**
- * Generate a Multi Group-By plan using a single map-reduce job.
- *
- * @param dest
- * @param qb
- * @param input
- * @return
- * @throws SemanticException
- *
- * Generate a Group-By plan using single map-reduce job, if there is
- * common group by key. Spray by the
- * common group by key set and compute
- * aggregates in the reduce. The agggregation evaluation
- * functions are as follows:
- *
- * Partitioning Key: common group by key set
- *
- * Sorting Key: group by keys, distinct keys
- *
- * Reducer: iterate/terminate (mode = COMPLETE)
- *
- */
- private Operator<?> genGroupByPlan1MRMultiGroupBy(String dest, QB qb,
- Operator<?> input) throws SemanticException {
-
- QBParseInfo parseInfo = qb.getParseInfo();
-
- // ////// Generate GroupbyOperator
- Operator<?> groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
- dest, input, GroupByDesc.Mode.COMPLETE, null);
-
- return groupByOperatorInfo;
- }
-
- /**
* Generate a Group-By plan using a 2 map-reduce jobs (5 operators will be
* inserted):
*
@@ -5489,7 +5682,7 @@ public class SemanticAnalyzer extends Ba
}
}
- private Operator insertSelectAllPlanForGroupBy(String dest, Operator input)
+ private Operator insertSelectAllPlanForGroupBy(Operator input)
throws SemanticException {
OpParseContext inputCtx = opParseCtx.get(input);
RowResolver inputRR = inputCtx.getRowResolver();
@@ -5528,7 +5721,7 @@ public class SemanticAnalyzer extends Ba
return null;
}
- List<ExprNodeDesc> oldList = null;
+ List<ExprNodeDesc.ExprNodeDescEqualityWrapper> oldList = null;
List<ASTNode> oldASTList = null;
for (String dest : ks) {
@@ -5548,31 +5741,27 @@ public class SemanticAnalyzer extends Ba
return null;
}
- List<ExprNodeDesc> currDestList = new ArrayList<ExprNodeDesc>();
+ List<ExprNodeDesc.ExprNodeDescEqualityWrapper> currDestList;
+ try {
+ currDestList = getDistinctExprs(qbp, dest, inputRR);
+ } catch (SemanticException e) {
+ return null;
+ }
+
List<ASTNode> currASTList = new ArrayList<ASTNode>();
for (ASTNode value: list) {
- try {
- // 0 is function name
- for (int i = 1; i < value.getChildCount(); i++) {
- ASTNode parameter = (ASTNode) value.getChild(i);
- currDestList.add(genExprNodeDesc(parameter, inputRR));
- currASTList.add(parameter);
- }
- } catch (SemanticException e) {
- return null;
+ // 0 is function name
+ for (int i = 1; i < value.getChildCount(); i++) {
+ ASTNode parameter = (ASTNode) value.getChild(i);
+ currASTList.add(parameter);
}
if (oldList == null) {
oldList = currDestList;
oldASTList = currASTList;
} else {
- if (oldList.size() != currDestList.size()) {
+ if (!matchExprLists(oldList, currDestList)) {
return null;
}
- for (int pos = 0; pos < oldList.size(); pos++) {
- if (!oldList.get(pos).isSame(currDestList.get(pos))) {
- return null;
- }
- }
}
}
}
@@ -5671,185 +5860,130 @@ public class SemanticAnalyzer extends Ba
return rsOp;
}
- // see if there are any distinct expressions
- private boolean distinctExprsExists(QB qb) {
+ // Groups the clause names into lists so that any two clauses in the same list has the same
+ // group by and distinct keys and no clause appears in more than one list. Returns a list of the
+ // lists of clauses.
+ private List<List<String>> getCommonGroupByDestGroups(QB qb, Operator input)
+ throws SemanticException {
+
+ RowResolver inputRR = opParseCtx.get(input).getRowResolver();
QBParseInfo qbp = qb.getParseInfo();
TreeSet<String> ks = new TreeSet<String>();
ks.addAll(qbp.getClauseNames());
- for (String dest : ks) {
- List<ASTNode> list = qbp.getDistinctFuncExprsForClause(dest);
- if (!list.isEmpty()) {
- return true;
- }
- }
- return false;
- }
-
- // return the common group by key set.
- // Null if there are no common group by keys.
- private List<ASTNode> getCommonGroupbyKeys(QB qb, Operator input) {
- RowResolver inputRR = opParseCtx.get(input).getRowResolver();
- QBParseInfo qbp = qb.getParseInfo();
+ List<List<String>> commonGroupByDestGroups = new ArrayList<List<String>>();
- Set<String> ks = qbp.getClauseNames();
- // Go over all the destination tables
+ // If this is a trivial query block return
if (ks.size() <= 1) {
- return null;
+ List<String> oneList = new ArrayList<String>(1);
+ if (ks.size() == 1) {
+ oneList.add(ks.first());
+ }
+ commonGroupByDestGroups.add(oneList);
+ return commonGroupByDestGroups;
}
- List<ASTNode> oldList = null;
+ List<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>> sprayKeyLists =
+ new ArrayList<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>>(ks.size());
+ // Iterate over each clause
for (String dest : ks) {
- // If a filter is present, common processing is not possible
- if (qbp.getWhrForClause(dest) != null) {
- return null;
- }
- // if one of the sub-queries does not involve an aggregation, common
- // processing is not possible
- List<ASTNode> list = getGroupByForClause(qbp, dest);
- if (list.isEmpty()) {
- return null;
+ List<ExprNodeDesc.ExprNodeDescEqualityWrapper> sprayKeys =
+ getDistinctExprs(qbp, dest, inputRR);
+
+ // Add the group by expressions
+ List<ASTNode> grpByExprs = getGroupByForClause(qbp, dest);
+ for (ASTNode grpByExpr: grpByExprs) {
+ ExprNodeDesc.ExprNodeDescEqualityWrapper grpByExprWrapper =
+ new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(grpByExpr, inputRR));
+ if (!sprayKeys.contains(grpByExprWrapper)) {
+ sprayKeys.add(grpByExprWrapper);
+ }
}
- if (oldList == null) {
- oldList = new ArrayList<ASTNode>();
- oldList.addAll(list);
- } else {
- int pos = 0;
- for (pos = 0; pos < oldList.size(); pos++) {
- if (pos < list.size()) {
- if (!oldList.get(pos).toStringTree().equals(list.get(pos).toStringTree())) {
- break;
- }
- } else {
- break;
- }
+
+ // Loop through each of the lists of exprs, looking for a match
+ boolean found = false;
+ for (int i = 0; i < sprayKeyLists.size(); i++) {
+
+ if (!matchExprLists(sprayKeyLists.get(i), sprayKeys)) {
+ continue;
}
- oldList = oldList.subList(0, pos);
+
+ // A match was found, so add the clause to the corresponding list
+ commonGroupByDestGroups.get(i).add(dest);
+ found = true;
+ break;
}
- if (oldList.isEmpty()) {
- return null;
+
+ // No match was found, so create new entries
+ if (!found) {
+ sprayKeyLists.add(sprayKeys);
+ List<String> destGroup = new ArrayList<String>();
+ destGroup.add(dest);
+ commonGroupByDestGroups.add(destGroup);
}
}
- return oldList;
- }
- /**
- * Generates reduce sink for multigroupby query for non null common groupby set
- *
- *All groupby keys and distinct exprs are added to reduce keys. And rows are
- *partitioned on common groupby key set.
- *
- * @param qb
- * @param input
- * @return
- * @throws SemanticException
- */
- private Operator createCommonReduceSink1(QB qb, Operator input)
- throws SemanticException {
- // Go over all the tables and get common groupby key
- List<ASTNode> cmonGbyExprs = getCommonGroupbyKeys(qb, input);
+ return commonGroupByDestGroups;
+ }
- QBParseInfo qbp = qb.getParseInfo();
- TreeSet<String> ks = new TreeSet<String>();
- ks.addAll(qbp.getClauseNames());
+ // Returns whether or not two lists contain the same elements independent of order
+ private boolean matchExprLists(List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list1,
+ List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list2) {
- // Pass the entire row
- RowResolver inputRR = opParseCtx.get(input).getRowResolver();
- RowResolver reduceSinkOutputRowResolver = new RowResolver();
- reduceSinkOutputRowResolver.setIsExprResolver(true);
- ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
- ArrayList<ExprNodeDesc> reducePartKeys = new ArrayList<ExprNodeDesc>();
- ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
- Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
- List<String> outputColumnNames = new ArrayList<String>();
- for (String dest : ks) {
- List<ASTNode> grpByExprs = getGroupByForClause(qbp, dest);
- for (int i = 0; i < grpByExprs.size(); ++i) {
- ASTNode grpbyExpr = grpByExprs.get(i);
+ if (list1.size() != list2.size()) {
+ return false;
+ }
- if (reduceSinkOutputRowResolver.getExpression(grpbyExpr) == null) {
- ExprNodeDesc grpByExprNode = genExprNodeDesc(grpbyExpr, inputRR);
- reduceKeys.add(grpByExprNode);
- String field = Utilities.ReduceField.KEY.toString() + "."
- + getColumnInternalName(reduceKeys.size() - 1);
- ColumnInfo colInfo = new ColumnInfo(field, reduceKeys.get(
- reduceKeys.size() - 1).getTypeInfo(), "", false);
- reduceSinkOutputRowResolver.putExpression(grpbyExpr, colInfo);
- outputColumnNames.add(getColumnInternalName(reduceKeys.size() - 1));
- colExprMap.put(colInfo.getInternalName(), grpByExprNode);
- }
+ for (ExprNodeDesc.ExprNodeDescEqualityWrapper exprNodeDesc : list1) {
+ if (!list2.contains(exprNodeDesc)) {
+ return false;
}
}
- // Add distinct group-by exprs to reduceKeys
- List<ASTNode> distExprs = getCommonDistinctExprs(qb, input);
- if (distExprs != null) {
- for (ASTNode distn : distExprs) {
- if (reduceSinkOutputRowResolver.getExpression(distn) == null) {
- ExprNodeDesc distExpr = genExprNodeDesc(distn, inputRR);
- reduceKeys.add(distExpr);
- String field = Utilities.ReduceField.KEY.toString() + "."
- + getColumnInternalName(reduceKeys.size() - 1);
- ColumnInfo colInfo = new ColumnInfo(field, reduceKeys.get(
- reduceKeys.size() - 1).getTypeInfo(), "", false);
- reduceSinkOutputRowResolver.putExpression(distn, colInfo);
- outputColumnNames.add(getColumnInternalName(reduceKeys.size() - 1));
- colExprMap.put(colInfo.getInternalName(), distExpr);
+
+ return true;
+ }
+
+ // Returns a list of the distinct exprs for a given clause name as
+ // ExprNodeDesc.ExprNodeDescEqualityWrapper without duplicates
+ private List<ExprNodeDesc.ExprNodeDescEqualityWrapper>
+ getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR) throws SemanticException {
+
+ List<ASTNode> distinctAggExprs = qbp.getDistinctFuncExprsForClause(dest);
+ List<ExprNodeDesc.ExprNodeDescEqualityWrapper> distinctExprs =
+ new ArrayList<ExprNodeDesc.ExprNodeDescEqualityWrapper>();
+
+ for (ASTNode distinctAggExpr: distinctAggExprs) {
+ // 0 is function name
+ for (int i = 1; i < distinctAggExpr.getChildCount(); i++) {
+ ASTNode parameter = (ASTNode) distinctAggExpr.getChild(i);
+ ExprNodeDesc.ExprNodeDescEqualityWrapper distinctExpr =
+ new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(parameter, inputRR));
+ if (!distinctExprs.contains(distinctExpr)) {
+ distinctExprs.add(distinctExpr);
}
}
}
- // Add common groupby keys to partition keys
- for (ASTNode gby : cmonGbyExprs) {
- ExprNodeDesc distExpr = genExprNodeDesc(gby, inputRR);
- reducePartKeys.add(distExpr);
- }
-
- // Go over all the aggregations
- for (String dest : ks) {
- // For each aggregation
- HashMap<String, ASTNode> aggregationTrees = qbp
- .getAggregationExprsForClause(dest);
- assert (aggregationTrees != null);
+ return distinctExprs;
+ }
- for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
- ASTNode value = entry.getValue();
- value.getChild(0).getText();
+ // see if there are any distinct expressions
+ private boolean distinctExprsExists(QB qb) {
+ QBParseInfo qbp = qb.getParseInfo();
- // 0 is the function name
- for (int i = 1; i < value.getChildCount(); i++) {
- ASTNode paraExpr = (ASTNode) value.getChild(i);
+ TreeSet<String> ks = new TreeSet<String>();
+ ks.addAll(qbp.getClauseNames());
- if (reduceSinkOutputRowResolver.getExpression(paraExpr) == null) {
- ExprNodeDesc paraExprNode = genExprNodeDesc(paraExpr, inputRR);
- reduceValues.add(paraExprNode);
- String field = Utilities.ReduceField.VALUE.toString() + "."
- + getColumnInternalName(reduceValues.size() - 1);
- ColumnInfo colInfo = new ColumnInfo(field, reduceValues.get(
- reduceValues.size() - 1).getTypeInfo(), "", false);
- reduceSinkOutputRowResolver.putExpression(paraExpr, colInfo);
- outputColumnNames
- .add(getColumnInternalName(reduceValues.size() - 1));
- }
- }
+ for (String dest : ks) {
+ List<ASTNode> list = qbp.getDistinctFuncExprsForClause(dest);
+ if (!list.isEmpty()) {
+ return true;
}
}
- StringBuilder order = new StringBuilder();
- for (int i = 0; i < reduceKeys.size(); i++) {
- order.append("+");
- }
-
- ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
- OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(
- reduceKeys, reduceValues,
- outputColumnNames, true, -1,
- reducePartKeys, order.toString(), -1),
- new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), input),
- reduceSinkOutputRowResolver);
- rsOp.setColumnExprMap(colExprMap);
- return rsOp;
+ return false;
}
@SuppressWarnings("nls")
@@ -5861,48 +5995,16 @@ public class SemanticAnalyzer extends Ba
// currently. It doesnt matter whether he has asked to do
// map-side aggregation or not. Map side aggregation is turned off
List<ASTNode> commonDistinctExprs = getCommonDistinctExprs(qb, input);
- List<ASTNode> commonGbyKeys = getCommonGroupbyKeys(qb, input);
- LOG.warn("Common Gby keys:" + commonGbyKeys);
boolean optimizeMultiGroupBy = commonDistinctExprs != null;
- // Generate single MR job for multigroupby query if query has non-null common
- // groupby key set and there are zero or one common distinct expression.
- boolean singlemrMultiGroupBy =
- conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEMR)
- && commonGbyKeys != null && !commonGbyKeys.isEmpty() &&
- (!distinctExprsExists(qb) || commonDistinctExprs != null);
Operator curr = input;
- // If there are multiple group-bys, map-side aggregation is turned off,
- // and there are no filters.
- // if there is a common groupby key set, spray by the common groupby key set
- // and generate single mr job
- if (singlemrMultiGroupBy) {
- curr = createCommonReduceSink1(qb, input);
-
- RowResolver currRR = opParseCtx.get(curr).getRowResolver();
- // create a forward operator
- input = putOpInsertMap(OperatorFactory.getAndMakeChild(new ForwardDesc(),
- new RowSchema(currRR.getColumnInfos()), curr), currRR);
-
- for (String dest : ks) {
- curr = input;
- curr = genGroupByPlan1MRMultiGroupBy(dest, qb, curr);
- curr = genSelectPlan(dest, qb, curr);
- Integer limit = qbp.getDestLimit(dest);
- if (limit != null) {
- curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(), true);
- qb.getParseInfo().setOuterQueryLimit(limit.intValue());
- }
- curr = genFileSinkPlan(dest, qb, curr);
- }
- }
- // and if there is a single distinct, optimize that. Spray initially by the
+ // if there is a single distinct, optimize that. Spray initially by the
// distinct key,
// no computation at the mapper. Have multiple group by operators at the
// reducer - and then
// proceed
- else if (optimizeMultiGroupBy) {
+ if (optimizeMultiGroupBy) {
curr = createCommonReduceSink(qb, input);
RowResolver currRR = opParseCtx.get(curr).getRowResolver();
@@ -5922,107 +6024,160 @@ public class SemanticAnalyzer extends Ba
curr = genFileSinkPlan(dest, qb, curr);
}
} else {
- // Go over all the destination tables
- for (String dest : ks) {
- curr = input;
+ List<List<String>> commonGroupByDestGroups = null;
- if (qbp.getWhrForClause(dest) != null) {
- curr = genFilterPlan(dest, qb, curr);
+ // If we can put multiple group bys in a single reducer, determine suitable groups of
+ // expressions, otherwise treat all the expressions as a single group
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
+ try {
+ commonGroupByDestGroups = getCommonGroupByDestGroups(qb, curr);
+ } catch (SemanticException e) {
+ LOG.error("Failed to group clauses by common spray keys.", e);
}
+ }
- if (qbp.getAggregationExprsForClause(dest).size() != 0
- || getGroupByForClause(qbp, dest).size() > 0) {
- //multiple distincts is not supported with skew in data
- if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
- qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
- throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
- getMsg());
- }
- // insert a select operator here used by the ColumnPruner to reduce
- // the data to shuffle
- curr = insertSelectAllPlanForGroupBy(dest, curr);
- if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
- if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
- curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
- } else {
- curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
+ if (commonGroupByDestGroups == null) {
+ commonGroupByDestGroups = new ArrayList<List<String>>();
+ commonGroupByDestGroups.add(new ArrayList<String>(ks));
+ }
+
+ if (!commonGroupByDestGroups.isEmpty()) {
+
+ // Iterate over each group of subqueries with the same group by/distinct keys
+ for (List<String> commonGroupByDestGroup : commonGroupByDestGroups) {
+ if (commonGroupByDestGroup.isEmpty()) {
+ continue;
+ }
+
+ String firstDest = commonGroupByDestGroup.get(0);
+ // Constructs a standard group by plan if:
+ // There is no other subquery with the same group by/distinct keys or
+ // (There are no aggregations in a representative query for the group and
+ // There is no group by in that representative query) or
+ // The data is skewed or
+ // The conf variable used to control combining group bys into a signle reducer is false
+ if (commonGroupByDestGroup.size() == 1 ||
+ (qbp.getAggregationExprsForClause(firstDest).size() == 0 &&
+ getGroupByForClause(qbp, firstDest).size() == 0) ||
+ conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) ||
+ !conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
+
+ // Go over all the destination tables
+ for (String dest : commonGroupByDestGroup) {
+ curr = input;
+
+ if (qbp.getWhrForClause(dest) != null) {
+ curr = genFilterPlan(dest, qb, curr);
+ }
+
+ if (qbp.getAggregationExprsForClause(dest).size() != 0
+ || getGroupByForClause(qbp, dest).size() > 0) {
+ //multiple distincts is not supported with skew in data
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
+ qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
+ throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
+ getMsg());
+ }
+ // insert a select operator here used by the ColumnPruner to reduce
+ // the data to shuffle
+ curr = insertSelectAllPlanForGroupBy(curr);
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+ if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+ curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
+ } else {
+ curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
+ }
+ } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+ curr = genGroupByPlan2MR(dest, qb, curr);
+ } else {
+ curr = genGroupByPlan1MR(dest, qb, curr);
+ }
+ }
+
+ curr = genPostGroupByBodyPlan(curr, dest, qb);
}
- } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
- curr = genGroupByPlan2MR(dest, qb, curr);
} else {
- curr = genGroupByPlan1MR(dest, qb, curr);
+ curr = genGroupByPlan1MRMultiReduceGB(commonGroupByDestGroup, qb, input);
}
}
+ }
+ }
- // Insert HAVING plan here
- if (qbp.getHavingForClause(dest) != null) {
- if (getGroupByForClause(qbp, dest).size() == 0) {
- throw new SemanticException("HAVING specified without GROUP BY");
- }
- curr = genHavingPlan(dest, qb, curr);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Created Body Plan for Query Block " + qb.getId());
+ }
- curr = genSelectPlan(dest, qb, curr);
- Integer limit = qbp.getDestLimit(dest);
+ return curr;
+ }
- if (qbp.getClusterByForClause(dest) != null
- || qbp.getDistributeByForClause(dest) != null
- || qbp.getOrderByForClause(dest) != null
- || qbp.getSortByForClause(dest) != null) {
+ private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb)
+ throws SemanticException {
- int numReducers = -1;
+ QBParseInfo qbp = qb.getParseInfo();
- // Use only 1 reducer if order by is present
- if (qbp.getOrderByForClause(dest) != null) {
- numReducers = 1;
- }
+ // Insert HAVING plan here
+ if (qbp.getHavingForClause(dest) != null) {
+ if (getGroupByForClause(qbp, dest).size() == 0) {
+ throw new SemanticException("HAVING specified without GROUP BY");
+ }
+ curr = genHavingPlan(dest, qb, curr);
+ }
- curr = genReduceSinkPlan(dest, qb, curr, numReducers);
- }
+ curr = genSelectPlan(dest, qb, curr);
+ Integer limit = qbp.getDestLimit(dest);
- if (qbp.getIsSubQ()) {
- if (limit != null) {
- // In case of order by, only 1 reducer is used, so no need of
- // another shuffle
- curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(), qbp
- .getOrderByForClause(dest) != null ? false : true);
- }
- } else {
- curr = genConversionOps(dest, qb, curr);
- // exact limit can be taken care of by the fetch operator
- if (limit != null) {
- boolean extraMRStep = true;
-
- if (qb.getIsQuery() && qbp.getClusterByForClause(dest) == null
- && qbp.getSortByForClause(dest) == null) {
- extraMRStep = false;
- }
+ if (qbp.getClusterByForClause(dest) != null
+ || qbp.getDistributeByForClause(dest) != null
+ || qbp.getOrderByForClause(dest) != null
+ || qbp.getSortByForClause(dest) != null) {
- curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(),
- extraMRStep);
- qb.getParseInfo().setOuterQueryLimit(limit.intValue());
- }
- curr = genFileSinkPlan(dest, qb, curr);
- }
-
- // change curr ops row resolver's tab aliases to query alias if it
- // exists
- if (qb.getParseInfo().getAlias() != null) {
- RowResolver rr = opParseCtx.get(curr).getRowResolver();
- RowResolver newRR = new RowResolver();
- String alias = qb.getParseInfo().getAlias();
- for (ColumnInfo colInfo : rr.getColumnInfos()) {
- String name = colInfo.getInternalName();
- String[] tmp = rr.reverseLookup(name);
- newRR.put(alias, tmp[1], colInfo);
- }
- opParseCtx.get(curr).setRowResolver(newRR);
- }
+ int numReducers = -1;
+
+ // Use only 1 reducer if order by is present
+ if (qbp.getOrderByForClause(dest) != null) {
+ numReducers = 1;
}
+
+ curr = genReduceSinkPlan(dest, qb, curr, numReducers);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug("Created Body Plan for Query Block " + qb.getId());
+ if (qbp.getIsSubQ()) {
+ if (limit != null) {
+ // In case of order by, only 1 reducer is used, so no need of
+ // another shuffle
+ curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(), qbp
+ .getOrderByForClause(dest) != null ? false : true);
+ }
+ } else {
+ curr = genConversionOps(dest, qb, curr);
+ // exact limit can be taken care of by the fetch operator
+ if (limit != null) {
+ boolean extraMRStep = true;
+
+ if (qb.getIsQuery() && qbp.getClusterByForClause(dest) == null
+ && qbp.getSortByForClause(dest) == null) {
+ extraMRStep = false;
+ }
+
+ curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(),
+ extraMRStep);
+ qb.getParseInfo().setOuterQueryLimit(limit.intValue());
+ }
+ curr = genFileSinkPlan(dest, qb, curr);
+ }
+
+ // change curr ops row resolver's tab aliases to query alias if it
+ // exists
+ if (qb.getParseInfo().getAlias() != null) {
+ RowResolver rr = opParseCtx.get(curr).getRowResolver();
+ RowResolver newRR = new RowResolver();
+ String alias = qb.getParseInfo().getAlias();
+ for (ColumnInfo colInfo : rr.getColumnInfos()) {
+ String name = colInfo.getInternalName();
+ String[] tmp = rr.reverseLookup(name);
+ newRR.put(alias, tmp[1], colInfo);
+ }
+ opParseCtx.get(curr).setRowResolver(newRR);
}
return curr;
@@ -6466,40 +6621,40 @@ public class SemanticAnalyzer extends Ba
tsDesc.addVirtualCols(vcList);
String tblName = tab.getTableName();
- tableSpec tblSpec = qbp.getTableSpec(alias);
- Map<String, String> partSpec = tblSpec.getPartSpec();
+ tableSpec tblSpec = qbp.getTableSpec(alias);
+ Map<String, String> partSpec = tblSpec.getPartSpec();
- if (partSpec != null) {
- List<String> cols = new ArrayList<String>();
- cols.addAll(partSpec.keySet());
- tsDesc.setPartColumns(cols);
- }
-
- // Theoretically the key prefix could be any unique string shared
- // between TableScanOperator (when publishing) and StatsTask (when aggregating).
- // Here we use
- // table_name + partitionSec
- // as the prefix for easy of read during explain and debugging.
- // Currently, partition spec can only be static partition.
- String k = tblName + Path.SEPARATOR;
- tsDesc.setStatsAggPrefix(k);
-
- // set up WritenEntity for replication
- outputs.add(new WriteEntity(tab, true));
-
- // add WriteEntity for each matching partition
- if (tab.isPartitioned()) {
- if (partSpec == null) {
- throw new SemanticException(ErrorMsg.NEED_PARTITION_SPECIFICATION.getMsg());
- }
- List<Partition> partitions = qbp.getTableSpec().partitions;
- if (partitions != null) {
- for (Partition partn : partitions) {
- // inputs.add(new ReadEntity(partn)); // is this needed at all?
- outputs.add(new WriteEntity(partn, true));
+ if (partSpec != null) {
+ List<String> cols = new ArrayList<String>();
+ cols.addAll(partSpec.keySet());
+ tsDesc.setPartColumns(cols);
+ }
+
+ // Theoretically the key prefix could be any unique string shared
+ // between TableScanOperator (when publishing) and StatsTask (when aggregating).
+ // Here we use
+ // table_name + partitionSec
+ // as the prefix for easy of read during explain and debugging.
+ // Currently, partition spec can only be static partition.
+ String k = tblName + Path.SEPARATOR;
+ tsDesc.setStatsAggPrefix(k);
+
+ // set up WritenEntity for replication
+ outputs.add(new WriteEntity(tab, true));
+
+ // add WriteEntity for each matching partition
+ if (tab.isPartitioned()) {
+ if (partSpec == null) {
+ throw new SemanticException(ErrorMsg.NEED_PARTITION_SPECIFICATION.getMsg());
+ }
+ List<Partition> partitions = qbp.getTableSpec().partitions;
+ if (partitions != null) {
+ for (Partition partn : partitions) {
+ // inputs.add(new ReadEntity(partn)); // is this needed at all?
+ outputs.add(new WriteEntity(partn, true));
}
}
- }
+ }
}
}
@@ -8206,7 +8361,7 @@ public class SemanticAnalyzer extends Ba
break;
}else{
mrtask.setLocalMode(true);
- }
+ }
} catch (IOException e) {
throw new SemanticException (e);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java Tue Jan 3 18:10:34 2012
@@ -22,9 +22,9 @@ import java.io.Serializable;
import java.util.List;
import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
/**
* ExprNodeDesc.
@@ -90,4 +90,31 @@ public abstract class ExprNodeDesc imple
return this.getClass().getName();
}
+ // This wraps an instance of an ExprNodeDesc, and makes equals work like isSame, see comment on
+ // isSame
+ public static class ExprNodeDescEqualityWrapper {
+ private ExprNodeDesc exprNodeDesc;
+
+ public ExprNodeDescEqualityWrapper(ExprNodeDesc exprNodeDesc) {
+ this.exprNodeDesc = exprNodeDesc;
+ }
+
+ public ExprNodeDesc getExprNodeDesc() {
+ return exprNodeDesc;
+ }
+
+ public void setExprNodeDesc(ExprNodeDesc exprNodeDesc) {
+ this.exprNodeDesc = exprNodeDesc;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+
+ if (other == null || !(other instanceof ExprNodeDescEqualityWrapper)) {
+ return false;
+ }
+
+ return this.exprNodeDesc.isSame(((ExprNodeDescEqualityWrapper)other).getExprNodeDesc());
+ }
+ }
}
Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby10.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby10.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby10.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby10.q Tue Jan 3 18:10:34 2012
@@ -1,4 +1,5 @@
set hive.map.aggr=false;
+set hive.multigroupby.singlereducer=false;
set hive.groupby.skewindata=true;
CREATE TABLE dest1(key INT, val1 INT, val2 INT);
@@ -19,7 +20,7 @@ INSERT OVERWRITE TABLE dest2 SELECT INPU
SELECT * from dest1;
SELECT * from dest2;
-set hive.multigroupby.singlemr=true;
+set hive.multigroupby.singlereducer=true;
EXPLAIN
FROM INPUT
Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q Tue Jan 3 18:10:34 2012
@@ -1,4 +1,5 @@
set hive.map.aggr=true;
+set hive.multigroupby.singlereducer=false;
set hive.groupby.skewindata=false;
set mapred.reduce.tasks=31;
Added: hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q?rev=1226903&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q Tue Jan 3 18:10:34 2012
@@ -0,0 +1,21 @@
+set hive.map.aggr=true;
+set hive.groupby.skewindata=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE DEST1(key INT, value STRING) STORED AS TEXTFILE;
+CREATE TABLE DEST2(key INT, value STRING) STORED AS TEXTFILE;
+
+SET hive.exec.compress.intermediate=true;
+SET hive.exec.compress.output=true;
+
+EXPLAIN
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key
+INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key;
+
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key
+INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key;
+
+SELECT DEST1.* FROM DEST1;
+SELECT DEST2.* FROM DEST2;
Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q Tue Jan 3 18:10:34 2012
@@ -1,5 +1,5 @@
set hive.map.aggr=false;
-
+set hive.multigroupby.singlereducer=false;
set hive.groupby.skewindata=false;
set mapred.reduce.tasks=31;
Added: hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q?rev=1226903&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q Tue Jan 3 18:10:34 2012
@@ -0,0 +1,21 @@
+set hive.map.aggr=false;
+set hive.groupby.skewindata=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE DEST1(key INT, value STRING) STORED AS TEXTFILE;
+CREATE TABLE DEST2(key INT, value STRING) STORED AS TEXTFILE;
+
+SET hive.exec.compress.intermediate=true;
+SET hive.exec.compress.output=true;
+
+EXPLAIN
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key limit 10
+INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key limit 10;
+
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key limit 10
+INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key limit 10;
+
+SELECT DEST1.* FROM DEST1;
+SELECT DEST2.* FROM DEST2;
Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby8.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby8.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby8.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby8.q Tue Jan 3 18:10:34 2012
@@ -16,7 +16,7 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
SELECT DEST1.* FROM DEST1;
SELECT DEST2.* FROM DEST2;
-set hive.multigroupby.singlemr=true;
+set hive.multigroupby.singlereducer=false;
EXPLAIN
FROM SRC
Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby9.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby9.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby9.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby9.q Tue Jan 3 18:10:34 2012
@@ -26,7 +26,7 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
SELECT DEST1.* FROM DEST1;
SELECT DEST2.* FROM DEST2;
-set hive.multigroupby.singlemr=true;
+set hive.multigroupby.singlereducer=false;
EXPLAIN
FROM SRC
Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q?rev=1226903&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q Tue Jan 3 18:10:34 2012
@@ -0,0 +1,17 @@
+set hive.multigroupby.singlereducer=true;
+
+CREATE TABLE DEST1(key ARRAY<STRING>, value BIGINT) STORED AS TEXTFILE;
+CREATE TABLE DEST2(key MAP<STRING, STRING>, value BIGINT) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT ARRAY(SRC.key), COUNT(1) GROUP BY ARRAY(SRC.key) limit 10
+INSERT OVERWRITE TABLE DEST2 SELECT MAP(SRC.key, SRC.value), COUNT(1) GROUP BY MAP(SRC.key, SRC.value) limit 10;
+
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT ARRAY(SRC.key), COUNT(1) GROUP BY ARRAY(SRC.key) limit 10
+INSERT OVERWRITE TABLE DEST2 SELECT MAP(SRC.key, SRC.value), COUNT(1) GROUP BY MAP(SRC.key, SRC.value) limit 10;
+
+SELECT DEST1.* FROM DEST1;
+SELECT DEST2.* FROM DEST2;
+
Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q?rev=1226903&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q Tue Jan 3 18:10:34 2012
@@ -0,0 +1,49 @@
+set hive.multigroupby.singlereducer=true;
+
+CREATE TABLE dest_g2(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+CREATE TABLE dest_g3(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+CREATE TABLE dest_g4(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+CREATE TABLE dest_h2(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+CREATE TABLE dest_h3(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) < 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g4 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) < 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g4 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+SELECT * FROM dest_g2;
+SELECT * FROM dest_g3;
+SELECT * FROM dest_g4;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) < 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g4 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_h2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1), substr(src.key,2,1) LIMIT 10
+INSERT OVERWRITE TABLE dest_h3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1), substr(src.key,2,1);
+
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) < 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g4 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_h2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1), substr(src.key,2,1) LIMIT 10
+INSERT OVERWRITE TABLE dest_h3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1), substr(src.key,2,1);
+
+SELECT * FROM dest_g2;
+SELECT * FROM dest_g3;
+SELECT * FROM dest_g4;
+SELECT * FROM dest_h2;
+SELECT * FROM dest_h3;
+
+DROP TABLE dest_g2;
+DROP TABLE dest_g3;
+DROP TABLE dest_g4;
+DROP TABLE dest_h2;
+DROP TABLE dest_h3;
Modified: hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q Tue Jan 3 18:10:34 2012
@@ -1,5 +1,3 @@
-set hive.multigroupby.singlemr=true;
-
CREATE TABLE TBL(C1 INT, C2 INT, C3 INT, C4 INT);
CREATE TABLE DEST1(d1 INT, d2 INT) STORED AS TEXTFILE;
Modified: hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out Tue Jan 3 18:10:34 2012
@@ -343,10 +343,12 @@ ABSTRACT SYNTAX TREE:
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-0 depends on stages: Stage-2
- Stage-3 depends on stages: Stage-0
- Stage-1 depends on stages: Stage-2
- Stage-4 depends on stages: Stage-1
+ Stage-3 depends on stages: Stage-2
+ Stage-0 depends on stages: Stage-3
+ Stage-4 depends on stages: Stage-0
+ Stage-5 depends on stages: Stage-2
+ Stage-1 depends on stages: Stage-5
+ Stage-6 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-2
@@ -357,89 +359,106 @@ STAGE PLANS:
alias: input
Reduce Output Operator
key expressions:
- expr: key
- type: int
expr: substr(value, 5)
type: string
- sort order: ++
+ sort order: +
Map-reduce partition columns:
+ expr: substr(value, 5)
+ type: string
+ tag: -1
+ value expressions:
expr: key
type: int
- tag: -1
Reduce Operator Tree:
Forward
Group By Operator
aggregations:
- expr: count(KEY._col1)
- expr: count(DISTINCT KEY._col1)
+ expr: count(KEY._col0)
+ expr: count(DISTINCT KEY._col0)
bucketGroup: false
keys:
- expr: KEY._col0
+ expr: VALUE._col0
type: int
- mode: complete
+ mode: hash
outputColumnNames: _col0, _col1, _col2
- Select Operator
- expressions:
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ Group By Operator
+ aggregations:
+ expr: sum(KEY._col0)
+ expr: sum(DISTINCT KEY._col0)
+ bucketGroup: false
+ keys:
+ expr: VALUE._col0
+ type: int
+ mode: hash
+ outputColumnNames: _col0, _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 0
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+ Stage: Stage-3
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ Reduce Output Operator
+ key expressions:
expr: _col0
type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: int
+ tag: -1
+ value expressions:
expr: _col1
type: bigint
expr: _col2
type: bigint
- outputColumnNames: _col0, _col1, _col2
- Select Operator
- expressions:
- expr: _col0
- type: int
- expr: UDFToInteger(_col1)
- type: int
- expr: UDFToInteger(_col2)
- type: int
- outputColumnNames: _col0, _col1, _col2
- File Output Operator
- compressed: false
- GlobalTableId: 1
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest1
- Group By Operator
- aggregations:
- expr: sum(KEY._col1)
- expr: sum(DISTINCT KEY._col1)
- bucketGroup: false
- keys:
- expr: KEY._col0
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: count(VALUE._col0)
+ expr: count(VALUE._col1)
+ bucketGroup: false
+ keys:
+ expr: KEY._col0
+ type: int
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Select Operator
+ expressions:
+ expr: _col0
type: int
- mode: complete
+ expr: _col1
+ type: bigint
+ expr: _col2
+ type: bigint
outputColumnNames: _col0, _col1, _col2
Select Operator
expressions:
expr: _col0
type: int
- expr: _col1
- type: double
- expr: _col2
- type: double
+ expr: UDFToInteger(_col1)
+ type: int
+ expr: UDFToInteger(_col2)
+ type: int
outputColumnNames: _col0, _col1, _col2
- Select Operator
- expressions:
- expr: _col0
- type: int
- expr: UDFToInteger(_col1)
- type: int
- expr: UDFToInteger(_col2)
- type: int
- outputColumnNames: _col0, _col1, _col2
- File Output Operator
- compressed: false
- GlobalTableId: 2
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 1
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest1
Stage: Stage-0
Move Operator
@@ -451,9 +470,65 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
- Stage: Stage-3
+ Stage: Stage-4
Stats-Aggr Operator
+ Stage: Stage-5
+ Map Reduce
+ Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+ Reduce Output Operator
+ key expressions:
+ expr: _col0
+ type: int
+ sort order: +
+ Map-reduce partition columns:
+ expr: _col0
+ type: int
+ tag: -1
+ value expressions:
+ expr: _col1
+ type: double
+ expr: _col2
+ type: double
+ Reduce Operator Tree:
+ Group By Operator
+ aggregations:
+ expr: sum(VALUE._col0)
+ expr: sum(VALUE._col1)
+ bucketGroup: false
+ keys:
+ expr: KEY._col0
+ type: int
+ mode: final
+ outputColumnNames: _col0, _col1, _col2
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: _col1
+ type: double
+ expr: _col2
+ type: double
+ outputColumnNames: _col0, _col1, _col2
+ Select Operator
+ expressions:
+ expr: _col0
+ type: int
+ expr: UDFToInteger(_col1)
+ type: int
+ expr: UDFToInteger(_col2)
+ type: int
+ outputColumnNames: _col0, _col1, _col2
+ File Output Operator
+ compressed: false
+ GlobalTableId: 2
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest2
+
Stage: Stage-1
Move Operator
tables:
@@ -464,7 +539,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest2
- Stage: Stage-4
+ Stage: Stage-6
Stats-Aggr Operator