You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ke...@apache.org on 2012/09/28 21:18:11 UTC
svn commit: r1391608 [1/4] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/ap...
Author: kevinwilfong
Date: Fri Sep 28 19:18:10 2012
New Revision: 1391608
URL: http://svn.apache.org/viewvc?rev=1391608&view=rev
Log:
HIVE-3432. perform a map-only group by if grouping key matches the sorting properties of the table. (njain via kevinwilfong)
Added:
hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q
hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q
hive/trunk/ql/src/test/results/clientpositive/groupby_sort_1.q.out
hive/trunk/ql/src/test/results/clientpositive/groupby_sort_skew_1.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out
hive/trunk/ql/src/test/results/clientpositive/metadataonly1.q.out
hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Sep 28 19:18:10 2012
@@ -385,6 +385,7 @@ public class HiveConf extends Configurat
HIVEMAPAGGRMEMORYTHRESHOLD("hive.map.aggr.hash.force.flush.memory.threshold", (float) 0.9),
HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5),
HIVEMULTIGROUPBYSINGLEREDUCER("hive.multigroupby.singlereducer", true),
+ HIVE_MAP_GROUPBY_SORT("hive.map.groupby.sorted", false),
// for hive udtf operator
HIVEUDTFAUTOPROGRESS("hive.udtf.auto.progress", false),
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Fri Sep 28 19:18:10 2012
@@ -453,6 +453,16 @@
job plan. If the multi group by query has common group by keys, it will be
optimized to generate single M/R job.</description>
</property>
+
+<property>
+ <name>hive.map.groupby.sorted</name>
+ <value>false</value>
+ <description>If the bucketing/sorting properties of the table exactly match the grouping key, whether to
+ perform the group by in the mapper by using BucketizedHiveInputFormat. The only downside to this
+ is that it limits the number of mappers to the number of files.
+ </description>
+</property>
+
<property>
<name>hive.join.emit.interval</name>
<value>1000</value>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Sep 28 19:18:10 2012
@@ -321,7 +321,7 @@ public class ExecDriver extends Task<Map
inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
}
- if (getWork().isSmbJoin()) {
+ if (getWork().isUseBucketizedHiveInputFormat()) {
inpFormat = BucketizedHiveInputFormat.class.getName();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Fri Sep 28 19:18:10 2012
@@ -165,4 +165,9 @@ public class FilterOperator extends Oper
public boolean supportSkewJoinOptimization() {
return true;
}
+
+ @Override
+ public boolean columnNamesRowResolvedCanBeObtained() {
+ return true;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Sep 28 19:18:10 2012
@@ -120,8 +120,6 @@ public class GroupByOperator extends Ope
// Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
protected transient HashSet<KeyWrapper> keysCurrentGroup;
- transient boolean bucketGroup;
-
transient boolean firstRow;
transient long totalMemory;
transient boolean hashAggr;
@@ -329,9 +327,8 @@ public class GroupByOperator extends Ope
objectInspectors.add(roi);
}
- bucketGroup = conf.getBucketGroup();
aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
- if (conf.getMode() != GroupByDesc.Mode.HASH || bucketGroup) {
+ if (conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) {
aggregations = newAggregations();
hashAggr = false;
} else {
@@ -808,7 +805,6 @@ public class GroupByOperator extends Ope
boolean keysAreEqual = (currentKeys != null && newKeys != null)?
newKeys.equals(currentKeys) : false;
-
// Forward the current keys if needed for sort-based aggregation
if (currentKeys != null && !keysAreEqual) {
forward(currentKeys.getKeyArray(), aggregations);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Sep 28 19:18:10 2012
@@ -103,6 +103,8 @@ public abstract class Operator<T extends
seqId = 0;
}
+ private boolean useBucketizedHiveInputFormat;
+
public Operator() {
id = String.valueOf(seqId++);
}
@@ -708,6 +710,31 @@ public abstract class Operator<T extends
}
}
+ // Remove the operators till a certain depth.
+ // Return true if the remove was successful, false otherwise
+ public boolean removeChildren(int depth) {
+ Operator<? extends OperatorDesc> currOp = this;
+ for (int i = 0; i < depth; i++) {
+ // If there are more than 1 children at any level, don't do anything
+ if ((currOp.getChildOperators() == null) ||
+ (currOp.getChildOperators().size() > 1)) {
+ return false;
+ }
+ currOp = currOp.getChildOperators().get(0);
+ }
+
+ setChildOperators(currOp.getChildOperators());
+
+ List<Operator<? extends OperatorDesc>> parentOps =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ parentOps.add(this);
+
+ for (Operator<? extends OperatorDesc> op : currOp.getChildOperators()) {
+ op.setParentOperators(parentOps);
+ }
+ return true;
+ }
+
/**
* Replace one parent with another at the same position. Chilren of the new
* parent are not updated
@@ -1376,4 +1403,16 @@ public abstract class Operator<T extends
return ret;
}
+
+ public boolean columnNamesRowResolvedCanBeObtained() {
+ return false;
+ }
+
+ public boolean isUseBucketizedHiveInputFormat() {
+ return useBucketizedHiveInputFormat;
+ }
+
+ public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) {
+ this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Fri Sep 28 19:18:10 2012
@@ -105,4 +105,9 @@ public class SelectOperator extends Oper
public boolean supportSkewJoinOptimization() {
return true;
}
+
+ @Override
+ public boolean columnNamesRowResolvedCanBeObtained() {
+ return true;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Fri Sep 28 19:18:10 2012
@@ -285,8 +285,8 @@ public final class GenMapRedUtils {
bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
bucketMJCxt.setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
bucketMJCxt.setBigTablePartSpecToFileMapping(
- currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
- plan.setSmbJoin(currMapJoinOp instanceof SMBMapJoinOperator);
+ currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
+ plan.setUseBucketizedHiveInputFormat(currMapJoinOp instanceof SMBMapJoinOperator);
}
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Fri Sep 28 19:18:10 2012
@@ -19,8 +19,9 @@
package org.apache.hadoop.hive.ql.optimizer;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -28,11 +29,13 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -53,15 +56,16 @@ import org.apache.hadoop.hive.ql.parse.S
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.util.StringUtils;
/**
- *this transformation does bucket group by optimization.
+ * This transformation does group by optimization. If the grouping key is a superset
+ * of the bucketing and sorting keys of the underlying table in the same order, the
+ * group by can be be performed on the map-side completely.
*/
public class GroupByOptimizer implements Transform {
@@ -75,19 +79,31 @@ public class GroupByOptimizer implements
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- GroupByOptProcCtx groupByOptimizeCtx = new GroupByOptProcCtx();
+ HiveConf conf = pctx.getConf();
- // process group-by pattern
- opRules.put(new RuleRegExp("R1",
- GroupByOperator.getOperatorName() + "%"
- + ReduceSinkOperator.getOperatorName() + "%"
- + GroupByOperator.getOperatorName() + "%"),
- getMapAggreSortedGroupbyProc(pctx));
+ if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+ // process group-by pattern
+ opRules.put(new RuleRegExp("R1",
+ GroupByOperator.getOperatorName() + "%" +
+ ReduceSinkOperator.getOperatorName() + "%" +
+ GroupByOperator.getOperatorName() + "%"),
+ getMapSortedGroupbyProc(pctx));
+ } else {
+ // If hive.groupby.skewindata is set to true, the operator tree is as below
+ opRules.put(new RuleRegExp("R2",
+ GroupByOperator.getOperatorName() + "%" +
+ ReduceSinkOperator.getOperatorName() + "%" +
+ GroupByOperator.getOperatorName() + "%" +
+ ReduceSinkOperator.getOperatorName() + "%" +
+ GroupByOperator.getOperatorName() + "%"),
+ getMapSortedGroupbySkewProc(pctx));
+ }
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
- Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules,
- groupByOptimizeCtx);
+ Dispatcher disp =
+ new DefaultRuleDispatcher(getDefaultProc(), opRules,
+ new GroupByOptimizerContext(conf));
GraphWalker ogw = new DefaultGraphWalker(disp);
// Create a list of topop nodes
@@ -102,212 +118,322 @@ public class GroupByOptimizer implements
return new NodeProcessor() {
@Override
public Object process(Node nd, Stack<Node> stack,
- NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
return null;
}
};
}
- private NodeProcessor getMapAggreSortedGroupbyProc(ParseContext pctx) {
- return new BucketGroupByProcessor(pctx);
+ private NodeProcessor getMapSortedGroupbyProc(ParseContext pctx) {
+ return new SortGroupByProcessor(pctx);
}
+ private NodeProcessor getMapSortedGroupbySkewProc(ParseContext pctx) {
+ return new SortGroupBySkewProcessor(pctx);
+ }
+
+ public enum GroupByOptimizerSortMatch {
+ NO_MATCH, PARTIAL_MATCH, COMPLETE_MATCH
+ };
+
/**
- * BucketGroupByProcessor.
+ * SortGroupByProcessor.
*
*/
- public class BucketGroupByProcessor implements NodeProcessor {
+ public class SortGroupByProcessor implements NodeProcessor {
protected ParseContext pGraphContext;
- public BucketGroupByProcessor(ParseContext pGraphContext) {
+ public SortGroupByProcessor(ParseContext pGraphContext) {
this.pGraphContext = pGraphContext;
}
+ // Check if the group by operator has already been processed
+ protected boolean checkGroupByOperatorProcessed(
+ GroupByOptimizerContext groupBySortOptimizerContext,
+ GroupByOperator groupByOp) {
+
+ // The group by operator has already been processed
+ if (groupBySortOptimizerContext.getListGroupByOperatorsProcessed().contains(groupByOp)) {
+ return true;
+ }
+
+ groupBySortOptimizerContext.getListGroupByOperatorsProcessed().add(groupByOp);
+ return false;
+ }
+
+ protected void processGroupBy(GroupByOptimizerContext ctx,
+ Stack<Node> stack,
+ GroupByOperator groupByOp,
+ int depth) throws SemanticException {
+ HiveConf hiveConf = ctx.getConf();
+ GroupByOptimizerSortMatch match = checkSortGroupBy(stack, groupByOp);
+ boolean useMapperSort =
+ HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT);
+
+ if (useMapperSort) {
+ if (match == GroupByOptimizerSortMatch.COMPLETE_MATCH) {
+ convertGroupByMapSideSortedGroupBy(groupByOp, depth);
+ }
+ }
+ else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
+ (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
+ groupByOp.getConf().setBucketGroup(true);
+ }
+ }
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
// GBY,RS,GBY... (top to bottom)
- GroupByOperator op = (GroupByOperator) stack.get(stack.size() - 3);
- checkBucketGroupBy(op);
+ GroupByOperator groupByOp = (GroupByOperator) stack.get(stack.size() - 3);
+
+ GroupByOptimizerContext ctx = (GroupByOptimizerContext)procCtx;
+
+ if (!checkGroupByOperatorProcessed(ctx, groupByOp)) {
+ processGroupBy(ctx, stack, groupByOp, 2);
+ }
return null;
}
- private void checkBucketGroupBy(GroupByOperator curr)
- throws SemanticException {
+ // Should this group by be converted to a map-side group by, because the grouping keys for
+ // the base table for the group by matches the skewed keys
+ protected GroupByOptimizerSortMatch checkSortGroupBy(Stack<Node> stack,
+ GroupByOperator groupByOp)
+ throws SemanticException {
// if this is not a HASH groupby, return
- if (curr.getConf().getMode() != GroupByDesc.Mode.HASH) {
- return;
+ if (groupByOp.getConf().getMode() != GroupByDesc.Mode.HASH) {
+ return GroupByOptimizerSortMatch.NO_MATCH;
}
- Set<String> tblNames = pGraphContext.getGroupOpToInputTables().get(curr);
- if (tblNames == null || tblNames.size() == 0) {
- return;
- }
+ // Check all the operators in the stack. Currently, only SELECTs and FILTERs
+ // are allowed. A interface 'supportMapSideGroupBy has been added for the same
+ Operator<? extends OperatorDesc> currOp = groupByOp;
+ currOp = currOp.getParentOperators().get(0);
+
+ while (true) {
+ if (currOp.getParentOperators() == null) {
+ break;
+ }
- boolean bucketGroupBy = true;
- GroupByDesc desc = curr.getConf();
- List<ExprNodeDesc> groupByKeys = new LinkedList<ExprNodeDesc>();
- groupByKeys.addAll(desc.getKeys());
- // compute groupby columns from groupby keys
- List<String> groupByCols = new ArrayList<String>();
- while (groupByKeys.size() > 0) {
- ExprNodeDesc node = groupByKeys.remove(0);
- if (node instanceof ExprNodeColumnDesc) {
- groupByCols.addAll(node.getCols());
- } else if ((node instanceof ExprNodeConstantDesc)
- || (node instanceof ExprNodeNullDesc)) {
- // nothing
- } else if (node instanceof ExprNodeFieldDesc) {
- groupByKeys.add(0, ((ExprNodeFieldDesc) node).getDesc());
- continue;
- } else if (node instanceof ExprNodeGenericFuncDesc) {
- ExprNodeGenericFuncDesc udfNode = ((ExprNodeGenericFuncDesc) node);
- GenericUDF udf = udfNode.getGenericUDF();
- if (!FunctionRegistry.isDeterministic(udf)) {
- return;
- }
- groupByKeys.addAll(0, udfNode.getChildExprs());
- } else {
- return;
+ if ((currOp.getParentOperators().size() > 1) ||
+ (!currOp.columnNamesRowResolvedCanBeObtained())) {
+ return GroupByOptimizerSortMatch.NO_MATCH;
}
+
+ currOp = currOp.getParentOperators().get(0);
}
- if (groupByCols.size() == 0) {
- return;
+ // currOp now points to the top-most tablescan operator
+ TableScanOperator tableScanOp = (TableScanOperator)currOp;
+ int stackPos = 0;
+ assert stack.get(0) == tableScanOp;
+
+ // Create a mapping from the group by columns to the table columns
+ Map<String, String> tableColsMapping = new HashMap<String, String>();
+ Set<String> constantCols = new HashSet<String>();
+ Table table = pGraphContext.getTopToTable().get(currOp);
+ for (FieldSchema col : table.getAllCols()) {
+ tableColsMapping.put(col.getName(), col.getName());
}
- for (String table : tblNames) {
- Operator<? extends OperatorDesc> topOp = pGraphContext.getTopOps().get(
- table);
- if (topOp == null || (!(topOp instanceof TableScanOperator))) {
- // this is in a sub-query.
- // In future, we need to infer subq's columns propery. For example
- // "select key, count(1)
- // from (from clustergroupbyselect key, value where ds='210') group by key, 3;",
- // even though the group by op is in a subquery, it can be changed to
- // bucket groupby.
- return;
- }
- TableScanOperator ts = (TableScanOperator) topOp;
- Table destTable = pGraphContext.getTopToTable().get(ts);
- if (destTable == null) {
- return;
- }
- if (!destTable.isPartitioned()) {
- List<String> bucketCols = destTable.getBucketCols();
- List<String> sortCols = Utilities
- .getColumnNamesFromSortCols(destTable.getSortCols());
- bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols,
- sortCols);
- if (!bucketGroupBy) {
- return;
+ while (currOp != groupByOp) {
+ Operator<? extends OperatorDesc> processOp = currOp;
+ Set<String> newConstantCols = new HashSet<String>();
+ currOp = (Operator<? extends OperatorDesc>)(stack.get(++stackPos));
+
+ // Filters don't change the column names - so, no need to do anything for them
+ if (processOp instanceof SelectOperator) {
+ SelectOperator selectOp = (SelectOperator)processOp;
+ SelectDesc selectDesc = selectOp.getConf();
+
+ if (selectDesc.isSelStarNoCompute()) {
+ continue;
}
- } else {
- PrunedPartitionList partsList = null;
- try {
- partsList = pGraphContext.getOpToPartList().get(ts);
- if (partsList == null) {
- partsList = PartitionPruner.prune(destTable, pGraphContext
- .getOpToPartPruner().get(ts), pGraphContext.getConf(), table,
- pGraphContext.getPrunedPartitions());
- pGraphContext.getOpToPartList().put(ts, partsList);
+
+ // Only columns and constants can be selected
+ for (int pos = 0; pos < selectDesc.getColList().size(); pos++) {
+ String outputColumnName = selectDesc.getOutputColumnNames().get(pos);
+ if (constantCols.contains(outputColumnName)) {
+ tableColsMapping.remove(outputColumnName);
+ newConstantCols.add(outputColumnName);
+ continue;
+ }
+
+ ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+ if (selectColList instanceof ExprNodeColumnDesc) {
+ String newValue =
+ tableColsMapping.get(((ExprNodeColumnDesc) selectColList).getColumn());
+ tableColsMapping.put(outputColumnName, newValue);
}
- } catch (HiveException e) {
- // Has to use full name to make sure it does not conflict with
- // org.apache.commons.lang.StringUtils
- LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
- throw new SemanticException(e.getMessage(), e);
+ else {
+ tableColsMapping.remove(outputColumnName);
+ if ((selectColList instanceof ExprNodeConstantDesc) ||
+ (selectColList instanceof ExprNodeNullDesc)) {
+ newConstantCols.add(outputColumnName);
+ }
+ }
+ }
+
+ constantCols = newConstantCols;
+ }
+ }
+
+ boolean sortGroupBy = true;
+ // compute groupby columns from groupby keys
+ List<String> groupByCols = new ArrayList<String>();
+ // If the group by expression is anything other than a list of columns,
+ // the sorting property is not obeyed
+ for (ExprNodeDesc expr : groupByOp.getConf().getKeys()) {
+ if (expr instanceof ExprNodeColumnDesc) {
+ String groupByKeyColumn = ((ExprNodeColumnDesc)expr).getColumn();
+ // ignore if it is a constant
+ if (constantCols.contains(groupByKeyColumn)) {
+ continue;
}
- List<Partition> parts = new ArrayList<Partition>();
- parts.addAll(partsList.getConfirmedPartns());
- parts.addAll(partsList.getUnknownPartns());
- for (Partition part : parts) {
- List<String> bucketCols = part.getBucketCols();
- List<String> sortCols = part.getSortColNames();
- bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols,
- sortCols);
- if (!bucketGroupBy) {
- return;
+ else {
+ if (tableColsMapping.containsKey(groupByKeyColumn)) {
+ groupByCols.add(tableColsMapping.get(groupByKeyColumn));
+ }
+ else {
+ return GroupByOptimizerSortMatch.NO_MATCH;
}
}
}
+ // Constants and nulls are OK
+ else if ((expr instanceof ExprNodeConstantDesc) ||
+ (expr instanceof ExprNodeNullDesc)) {
+ continue;
+ } else {
+ return GroupByOptimizerSortMatch.NO_MATCH;
+ }
}
- curr.getConf().setBucketGroup(bucketGroupBy);
+ if (!table.isPartitioned()) {
+ List<String> sortCols = Utilities.getColumnNamesFromSortCols(table.getSortCols());
+ return matchSortColumns(groupByCols, sortCols);
+ } else {
+ PrunedPartitionList partsList = null;
+ try {
+ partsList = pGraphContext.getOpToPartList().get(tableScanOp);
+ if (partsList == null) {
+ partsList = PartitionPruner.prune(table,
+ pGraphContext.getOpToPartPruner().get(tableScanOp),
+ pGraphContext.getConf(),
+ table.getTableName(),
+ pGraphContext.getPrunedPartitions());
+ pGraphContext.getOpToPartList().put(tableScanOp, partsList);
+ }
+ } catch (HiveException e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw new SemanticException(e.getMessage(), e);
+ }
+
+ GroupByOptimizerSortMatch currentMatch = GroupByOptimizerSortMatch.COMPLETE_MATCH;
+ for (Partition part : partsList.getNotDeniedPartns()) {
+ List<String> sortCols = part.getSortColNames();
+ GroupByOptimizerSortMatch match = matchSortColumns(groupByCols, sortCols);
+ if (match == GroupByOptimizerSortMatch.NO_MATCH) {
+ return match;
+ }
+
+ if (match == GroupByOptimizerSortMatch.PARTIAL_MATCH) {
+ currentMatch = match;
+ }
+ }
+ return currentMatch;
+ }
}
/**
- * Given the group by keys, bucket columns, sort column, this method
+ * Given the group by keys, sort columns, this method
* determines if we can use sorted group by or not.
- *
- * We use bucket columns only when the sorted column set is empty and if all
- * group by columns are contained in bucket columns.
- *
- * If we can can not determine by looking at bucketed columns and the table
- * has sort columns, we resort to sort columns. We can use bucket group by
- * if the groupby column set is an exact prefix match of sort columns.
+ * We can use map-side sort group by group by columns match the sorted columns
+ * in exactly the same order.
*
* @param groupByCols
- * @param bucketCols
* @param sortCols
* @return
* @throws SemanticException
*/
- private boolean matchBucketOrSortedColumns(List<String> groupByCols,
- List<String> bucketCols, List<String> sortCols) throws SemanticException {
- boolean ret = false;
+ private GroupByOptimizerSortMatch matchSortColumns(
+ List<String> groupByCols,
+ List<String> sortCols) throws SemanticException {
if (sortCols == null || sortCols.size() == 0) {
- ret = matchBucketColumns(groupByCols, bucketCols);
+ return GroupByOptimizerSortMatch.NO_MATCH;
}
- if (!ret && sortCols != null && sortCols.size() >= groupByCols.size()) {
- // check sort columns, if groupByCols is a prefix subset of sort
- // columns, we will use sorted group by. For example, if data is sorted
- // by column a, b, c, and a query wants to group by b,a, we will use
- // sorted group by. But if the query wants to groupby b,c, then sorted
- // group by can not be used.
- int num = groupByCols.size();
- for (int i = 0; i < num; i++) {
- if (sortCols.indexOf(groupByCols.get(i)) > (num - 1)) {
- return false;
- }
+ int num = sortCols.size() < groupByCols.size() ? sortCols.size() : groupByCols.size();
+ for (int i = 0; i < num; i++) {
+ if (!sortCols.get(i).equals(groupByCols.get(i))) {
+ return GroupByOptimizerSortMatch.NO_MATCH;
}
- return true;
}
- return ret;
+ return sortCols.size() == groupByCols.size() ?
+ GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
}
- /*
- * All group by columns should be contained in the bucket column set. And
- * the number of group by columns should be equal to number of bucket
- * columns.
- */
- private boolean matchBucketColumns(List<String> grpCols,
- List<String> tblBucketCols) throws SemanticException {
-
- if (tblBucketCols == null || tblBucketCols.size() == 0
- || grpCols.size() == 0 || grpCols.size() != tblBucketCols.size()) {
- return false;
- }
-
- for (int i = 0; i < grpCols.size(); i++) {
- String tblCol = grpCols.get(i);
- if (!tblBucketCols.contains(tblCol)) {
- return false;
- }
+ // Convert the group by to a map-side group by
+ // The operators specified by depth and removed from the tree.
+ protected void convertGroupByMapSideSortedGroupBy(GroupByOperator groupByOp, int depth) {
+ if (groupByOp.removeChildren(depth)) {
+ // Use bucketized hive input format - that makes sure that one mapper reads the entire file
+ groupByOp.setUseBucketizedHiveInputFormat(true);
+ groupByOp.getConf().setMode(GroupByDesc.Mode.FINAL);
}
- return true;
}
}
/**
- * GroupByOptProcCtx.
+ * SortGroupByProcessor.
*
*/
- public class GroupByOptProcCtx implements NodeProcessorCtx {
+ public class SortGroupBySkewProcessor extends SortGroupByProcessor {
+ public SortGroupBySkewProcessor(ParseContext pGraphContext) {
+ super(pGraphContext);
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ // GBY,RS,GBY,RS,GBY... (top to bottom)
+ GroupByOperator groupByOp = (GroupByOperator) stack.get(stack.size() - 5);
+ GroupByOptimizerContext ctx = (GroupByOptimizerContext)procCtx;
+
+ if (!checkGroupByOperatorProcessed(ctx, groupByOp)) {
+ processGroupBy(ctx, stack, groupByOp, 4);
+ }
+ return null;
+ }
+ }
+
+ public class GroupByOptimizerContext implements NodeProcessorCtx {
+ List<GroupByOperator> listGroupByOperatorsProcessed;
+ HiveConf conf;
+
+ public GroupByOptimizerContext(HiveConf conf) {
+ this.conf = conf;
+ listGroupByOperatorsProcessed = new ArrayList<GroupByOperator>();
+ }
+
+ public List<GroupByOperator> getListGroupByOperatorsProcessed() {
+ return listGroupByOperatorsProcessed;
+ }
+
+ public void setListGroupByOperatorsProcessed(
+ List<GroupByOperator> listGroupByOperatorsProcessed) {
+ this.listGroupByOperatorsProcessed = listGroupByOperatorsProcessed;
+ }
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ public void setConf(HiveConf conf) {
+ this.conf = conf;
+ }
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Sep 28 19:18:10 2012
@@ -63,7 +63,8 @@ public class Optimizer {
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGBYUSINGINDEX)) {
transformations.add(new RewriteGBUsingIndex());
}
- if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGROUPBY)) {
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGROUPBY) ||
+ HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT)) {
transformations.add(new GroupByOptimizer());
}
transformations.add(new SamplePruner());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Sep 28 19:18:10 2012
@@ -7263,6 +7263,12 @@ public class SemanticAnalyzer extends Ba
setKeyDescTaskTree(rootTask);
}
+ // If a task contains an operator which instructs bucketizedhiveinputformat
+ // to be used, please do so
+ for (Task<? extends Serializable> rootTask : rootTasks) {
+ setInputFormat(rootTask);
+ }
+
PhysicalContext physicalContext = new PhysicalContext(conf,
getParseContext(), ctx, rootTasks, fetchTask);
PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
@@ -7443,6 +7449,43 @@ public class SemanticAnalyzer extends Ba
}
}
+ private void setInputFormat(MapredWork work, Operator<? extends OperatorDesc> op) {
+ if (op.isUseBucketizedHiveInputFormat()) {
+ work.setUseBucketizedHiveInputFormat(true);
+ return;
+ }
+
+ if (op.getChildOperators() != null) {
+ for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+ setInputFormat(work, childOp);
+ }
+ }
+ }
+
+ // loop over all the tasks recursviely
+ private void setInputFormat(Task<? extends Serializable> task) {
+ if (task instanceof ExecDriver) {
+ MapredWork work = (MapredWork) task.getWork();
+ HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork();
+ if (!opMap.isEmpty()) {
+ for (Operator<? extends OperatorDesc> op : opMap.values()) {
+ setInputFormat(work, op);
+ }
+ }
+ } else if (task instanceof ConditionalTask) {
+ List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task).getListTasks();
+ for (Task<? extends Serializable> tsk : listTasks) {
+ setInputFormat(tsk);
+ }
+ }
+
+ if (task.getChildTasks() != null) {
+ for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+ setInputFormat(childTask);
+ }
+ }
+ }
+
// loop over all the tasks recursviely
private void setKeyDescTaskTree(Task<? extends Serializable> task) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Fri Sep 28 19:18:10 2012
@@ -52,6 +52,8 @@ public class GroupByDesc extends Abstrac
private Mode mode;
private boolean groupKeyNotReductionKey;
+
+ // no hash aggregations for group by
private boolean bucketGroup;
private ArrayList<ExprNodeDesc> keys;
@@ -177,8 +179,8 @@ public class GroupByDesc extends Abstrac
return bucketGroup;
}
- public void setBucketGroup(boolean dataSorted) {
- bucketGroup = dataSorted;
+ public void setBucketGroup(boolean bucketGroup) {
+ this.bucketGroup = bucketGroup;
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Fri Sep 28 19:18:10 2012
@@ -89,7 +89,7 @@ public class MapredWork extends Abstract
// used to indicate the input is sorted, and so a BinarySearchRecordReader shoudl be used
private boolean inputFormatSorted = false;
- private transient boolean smbJoin;
+ private transient boolean useBucketizedHiveInputFormat;
public MapredWork() {
aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
@@ -488,11 +488,11 @@ public class MapredWork extends Abstract
return returnList;
}
- public boolean isSmbJoin() {
- return smbJoin;
+ public boolean isUseBucketizedHiveInputFormat() {
+ return useBucketizedHiveInputFormat;
}
- public void setSmbJoin(boolean smbJoin) {
- this.smbJoin = smbJoin;
+ public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) {
+ this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
}
}
Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q?rev=1391608&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q Fri Sep 28 19:18:10 2012
@@ -0,0 +1,282 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 10;
+set hive.map.groupby.sorted=true;
+
+CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 select key, val from T1;
+
+CREATE TABLE outputTbl1(key int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key
+-- matches the skewed key
+-- addind a order by at the end to make the test results deterministic
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+CREATE TABLE outputTbl2(key1 int, key2 string, cnt int);
+
+-- no map-side group by even if the group by key is a superset of skewed key
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl2
+SELECT key, val, count(1) FROM T1 GROUP BY key, val;
+
+INSERT OVERWRITE TABLE outputTbl2
+SELECT key, val, count(1) FROM T1 GROUP BY key, val;
+
+SELECT * FROM outputTbl2 ORDER BY key1, key2;
+
+-- It should work for sub-queries
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM (SELECT key, val FROM T1) subq1 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM (SELECT key, val FROM T1) subq1 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- It should work for sub-queries with column aliases
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT k, count(1) FROM (SELECT key as k, val as v FROM T1) subq1 GROUP BY k;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT k, count(1) FROM (SELECT key as k, val as v FROM T1) subq1 GROUP BY k;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+CREATE TABLE outputTbl3(key1 int, key2 int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant followed
+-- by a match to the skewed key
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl3
+SELECT 1, key, count(1) FROM T1 GROUP BY 1, key;
+
+INSERT OVERWRITE TABLE outputTbl3
+SELECT 1, key, count(1) FROM T1 GROUP BY 1, key;
+
+SELECT * FROM outputTbl3 ORDER BY key1, key2;
+
+CREATE TABLE outputTbl4(key1 int, key2 int, key3 string, cnt int);
+
+-- no map-side group by if the group by key contains a constant followed by another column
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T1 GROUP BY key, 1, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T1 GROUP BY key, 1, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+-- no map-side group by if the group by key contains a function
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl3
+SELECT key, key + 1, count(1) FROM T1 GROUP BY key, key + 1;
+
+INSERT OVERWRITE TABLE outputTbl3
+SELECT key, key + 1, count(1) FROM T1 GROUP BY key, key + 1;
+
+SELECT * FROM outputTbl3 ORDER BY key1, key2;
+
+-- it should not matter what follows the group by
+-- test various cases
+
+-- group by followed by another group by
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key + key, sum(cnt) from
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+group by key + key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key + key, sum(cnt) from
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+group by key + key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a union
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+ UNION ALL
+SELECT key, count(1) FROM T1 GROUP BY key
+) subq1;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+ UNION ALL
+SELECT key, count(1) FROM T1 GROUP BY key
+) subq1;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a union where one of the sub-queries is map-side group by
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+ UNION ALL
+SELECT key + key as key, count(1) FROM T1 GROUP BY key + key
+) subq1;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) as cnt FROM T1 GROUP BY key
+ UNION ALL
+SELECT key + key as key, count(1) as cnt FROM T1 GROUP BY key + key
+) subq1;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a join
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT subq1.key, subq1.cnt+subq2.cnt FROM
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq2
+ON subq1.key = subq2.key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT subq1.key, subq1.cnt+subq2.cnt FROM
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq2
+ON subq1.key = subq2.key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a join where one of the sub-queries can be performed in the mapper
+EXPLAIN EXTENDED
+SELECT * FROM
+(SELECT key, count(1) FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, val, count(1) FROM T1 GROUP BY key, val) subq2
+ON subq1.key = subq2.key;
+
+CREATE TABLE T2(key STRING, val STRING)
+CLUSTERED BY (key, val) SORTED BY (key, val) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T2 select key, val from T1;
+
+-- no mapside sort group by if the group by is a prefix of the sorted key
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T2 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T2 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant in between the
+-- skewed keys
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T2 GROUP BY key, 1, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T2 GROUP BY key, 1, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+CREATE TABLE outputTbl5(key1 int, key2 int, key3 string, key4 int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant in between the
+-- skewed keys followed by anything
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl5
+SELECT key, 1, val, 2, count(1) FROM T2 GROUP BY key, 1, val, 2;
+
+INSERT OVERWRITE TABLE outputTbl5
+SELECT key, 1, val, 2, count(1) FROM T2 GROUP BY key, 1, val, 2;
+
+SELECT * FROM outputTbl5
+ORDER BY key1, key2, key3, key4;
+
+-- contants from sub-queries should work fine
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, constant, val, count(1) from
+(SELECT key, 1 as constant, val from T2)subq
+group by key, constant, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, constant, val, count(1) from
+(SELECT key, 1 as constant, val from T2)subq
+group by key, constant, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+-- multiple levels of contants from sub-queries should work fine
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+select key, constant3, val, count(1) from
+(
+SELECT key, constant as constant2, val, 2 as constant3 from
+(SELECT key, 1 as constant, val from T2)subq
+)subq2
+group by key, constant3, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+select key, constant3, val, count(1) from
+(
+SELECT key, constant as constant2, val, 2 as constant3 from
+(SELECT key, 1 as constant, val from T2)subq
+)subq2
+group by key, constant3, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+set hive.map.aggr=true;
+set hive.multigroupby.singlereducer=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE DEST1(key INT, cnt INT);
+CREATE TABLE DEST2(key INT, val STRING, cnt INT);
+
+SET hive.exec.compress.intermediate=true;
+SET hive.exec.compress.output=true;
+
+EXPLAIN
+FROM T2
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+FROM T2
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+select * from DEST1 ORDER BY key, cnt;
+select * from DEST2 ORDER BY key, val, val;
+
+-- multi-table insert with a sub-query
+EXPLAIN
+FROM (select key, val from T2 where key = 8) x
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+FROM (select key, val from T2 where key = 8) x
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+select * from DEST1 ORDER BY key, cnt;
+select * from DEST2 ORDER BY key, val, cnt;
Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q?rev=1391608&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q Fri Sep 28 19:18:10 2012
@@ -0,0 +1,283 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 10;
+set hive.map.groupby.sorted=true;
+set hive.groupby.skewindata=true;
+
+CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 select key, val from T1;
+
+CREATE TABLE outputTbl1(key int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key
+-- matches the skewed key
+-- addind a order by at the end to make the test results deterministic
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+CREATE TABLE outputTbl2(key1 int, key2 string, cnt int);
+
+-- no map-side group by even if the group by key is a superset of skewed key
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl2
+SELECT key, val, count(1) FROM T1 GROUP BY key, val;
+
+INSERT OVERWRITE TABLE outputTbl2
+SELECT key, val, count(1) FROM T1 GROUP BY key, val;
+
+SELECT * FROM outputTbl2 ORDER BY key1, key2;
+
+-- It should work for sub-queries
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM (SELECT key, val FROM T1) subq1 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM (SELECT key, val FROM T1) subq1 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- It should work for sub-queries with column aliases
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT k, count(1) FROM (SELECT key as k, val as v FROM T1) subq1 GROUP BY k;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT k, count(1) FROM (SELECT key as k, val as v FROM T1) subq1 GROUP BY k;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+CREATE TABLE outputTbl3(key1 int, key2 int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant followed
+-- by a match to the skewed key
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl3
+SELECT 1, key, count(1) FROM T1 GROUP BY 1, key;
+
+INSERT OVERWRITE TABLE outputTbl3
+SELECT 1, key, count(1) FROM T1 GROUP BY 1, key;
+
+SELECT * FROM outputTbl3 ORDER BY key1, key2;
+
+CREATE TABLE outputTbl4(key1 int, key2 int, key3 string, cnt int);
+
+-- no map-side group by if the group by key contains a constant followed by another column
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T1 GROUP BY key, 1, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T1 GROUP BY key, 1, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+-- no map-side group by if the group by key contains a function
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl3
+SELECT key, key + 1, count(1) FROM T1 GROUP BY key, key + 1;
+
+INSERT OVERWRITE TABLE outputTbl3
+SELECT key, key + 1, count(1) FROM T1 GROUP BY key, key + 1;
+
+SELECT * FROM outputTbl3 ORDER BY key1, key2;
+
+-- it should not matter what follows the group by
+-- test various cases
+
+-- group by followed by another group by
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key + key, sum(cnt) from
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+group by key + key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key + key, sum(cnt) from
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+group by key + key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a union
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+ UNION ALL
+SELECT key, count(1) FROM T1 GROUP BY key
+) subq1;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+ UNION ALL
+SELECT key, count(1) FROM T1 GROUP BY key
+) subq1;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a union where one of the sub-queries is map-side group by
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+ UNION ALL
+SELECT key + key as key, count(1) FROM T1 GROUP BY key + key
+) subq1;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) as cnt FROM T1 GROUP BY key
+ UNION ALL
+SELECT key + key as key, count(1) as cnt FROM T1 GROUP BY key + key
+) subq1;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a join
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT subq1.key, subq1.cnt+subq2.cnt FROM
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq2
+ON subq1.key = subq2.key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT subq1.key, subq1.cnt+subq2.cnt FROM
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq2
+ON subq1.key = subq2.key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a join where one of the sub-queries can be performed in the mapper
+EXPLAIN EXTENDED
+SELECT * FROM
+(SELECT key, count(1) FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, val, count(1) FROM T1 GROUP BY key, val) subq2
+ON subq1.key = subq2.key;
+
+CREATE TABLE T2(key STRING, val STRING)
+CLUSTERED BY (key, val) SORTED BY (key, val) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T2 select key, val from T1;
+
+-- no mapside sort group by if the group by is a prefix of the sorted key
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T2 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T2 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant in between the
+-- skewed keys
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T2 GROUP BY key, 1, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T2 GROUP BY key, 1, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+CREATE TABLE outputTbl5(key1 int, key2 int, key3 string, key4 int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant in between the
+-- skewed keys followed by anything
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl5
+SELECT key, 1, val, 2, count(1) FROM T2 GROUP BY key, 1, val, 2;
+
+INSERT OVERWRITE TABLE outputTbl5
+SELECT key, 1, val, 2, count(1) FROM T2 GROUP BY key, 1, val, 2;
+
+SELECT * FROM outputTbl5
+ORDER BY key1, key2, key3, key4;
+
+-- contants from sub-queries should work fine
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, constant, val, count(1) from
+(SELECT key, 1 as constant, val from T2)subq
+group by key, constant, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, constant, val, count(1) from
+(SELECT key, 1 as constant, val from T2)subq
+group by key, constant, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+-- multiple levels of contants from sub-queries should work fine
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+select key, constant3, val, count(1) from
+(
+SELECT key, constant as constant2, val, 2 as constant3 from
+(SELECT key, 1 as constant, val from T2)subq
+)subq2
+group by key, constant3, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+select key, constant3, val, count(1) from
+(
+SELECT key, constant as constant2, val, 2 as constant3 from
+(SELECT key, 1 as constant, val from T2)subq
+)subq2
+group by key, constant3, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+set hive.map.aggr=true;
+set hive.multigroupby.singlereducer=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE DEST1(key INT, cnt INT);
+CREATE TABLE DEST2(key INT, val STRING, cnt INT);
+
+SET hive.exec.compress.intermediate=true;
+SET hive.exec.compress.output=true;
+
+EXPLAIN
+FROM T2
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+FROM T2
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+select * from DEST1 ORDER BY key, cnt;
+select * from DEST2 ORDER BY key, val, val;
+
+-- multi-table insert with a sub-query
+EXPLAIN
+FROM (select key, val from T2 where key = 8) x
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+FROM (select key, val from T2 where key = 8) x
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+select * from DEST1 ORDER BY key, cnt;
+select * from DEST2 ORDER BY key, val, cnt;
Modified: hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out Fri Sep 28 19:18:10 2012
@@ -60,7 +60,7 @@ STAGE PLANS:
Group By Operator
aggregations:
expr: count(1)
- bucketGroup: true
+ bucketGroup: false
keys:
expr: key
type: string
@@ -185,7 +185,7 @@ STAGE PLANS:
Group By Operator
aggregations:
expr: count(1)
- bucketGroup: true
+ bucketGroup: false
keys:
expr: key
type: string
@@ -289,7 +289,7 @@ STAGE PLANS:
Group By Operator
aggregations:
expr: count(1)
- bucketGroup: true
+ bucketGroup: false
keys:
expr: length(key)
type: int
@@ -384,7 +384,7 @@ STAGE PLANS:
Group By Operator
aggregations:
expr: count(1)
- bucketGroup: true
+ bucketGroup: false
keys:
expr: abs(length(key))
type: int
@@ -481,7 +481,7 @@ STAGE PLANS:
Group By Operator
aggregations:
expr: count(1)
- bucketGroup: true
+ bucketGroup: false
keys:
expr: key
type: string
@@ -700,7 +700,7 @@ STAGE PLANS:
Group By Operator
aggregations:
expr: count(1)
- bucketGroup: true
+ bucketGroup: false
keys:
expr: key
type: string
@@ -1102,7 +1102,7 @@ STAGE PLANS:
Group By Operator
aggregations:
expr: count(1)
- bucketGroup: true
+ bucketGroup: false
keys:
expr: key
type: string