You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ke...@apache.org on 2012/09/28 21:18:11 UTC

svn commit: r1391608 [1/4] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/ap...

Author: kevinwilfong
Date: Fri Sep 28 19:18:10 2012
New Revision: 1391608

URL: http://svn.apache.org/viewvc?rev=1391608&view=rev
Log:
HIVE-3432. perform a map-only group by if grouping key matches the sorting properties of the table. (njain via kevinwilfong)

Added:
    hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q
    hive/trunk/ql/src/test/results/clientpositive/groupby_sort_1.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_sort_skew_1.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
    hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out
    hive/trunk/ql/src/test/results/clientpositive/metadataonly1.q.out
    hive/trunk/ql/src/test/results/clientpositive/ql_rewrite_gbtoidx.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Sep 28 19:18:10 2012
@@ -385,6 +385,7 @@ public class HiveConf extends Configurat
     HIVEMAPAGGRMEMORYTHRESHOLD("hive.map.aggr.hash.force.flush.memory.threshold", (float) 0.9),
     HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5),
     HIVEMULTIGROUPBYSINGLEREDUCER("hive.multigroupby.singlereducer", true),
+    HIVE_MAP_GROUPBY_SORT("hive.map.groupby.sorted", false),
 
     // for hive udtf operator
     HIVEUDTFAUTOPROGRESS("hive.udtf.auto.progress", false),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Fri Sep 28 19:18:10 2012
@@ -453,6 +453,16 @@
   job plan. If the multi group by query has common group by keys, it will be
   optimized to generate single M/R job.</description>
 </property>
+
+<property>
+  <name>hive.map.groupby.sorted</name>
+  <value>false</value>
+  <description>If the bucketing/sorting properties of the table exactly match the grouping key, whether to 
+    perform the group by in the mapper by using BucketizedHiveInputFormat. The only downside to this
+    is that it limits the number of mappers to the number of files. 
+  </description>
+</property>
+
 <property>
   <name>hive.join.emit.interval</name>
   <value>1000</value>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Sep 28 19:18:10 2012
@@ -321,7 +321,7 @@ public class ExecDriver extends Task<Map
       inpFormat = ShimLoader.getHadoopShims().getInputFormatClassName();
     }
 
-    if (getWork().isSmbJoin()) {
+    if (getWork().isUseBucketizedHiveInputFormat()) {
       inpFormat = BucketizedHiveInputFormat.class.getName();
     }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Fri Sep 28 19:18:10 2012
@@ -165,4 +165,9 @@ public class FilterOperator extends Oper
   public boolean supportSkewJoinOptimization() {
     return true;
   }
+
+  @Override
+  public boolean columnNamesRowResolvedCanBeObtained() {
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Sep 28 19:18:10 2012
@@ -120,8 +120,6 @@ public class GroupByOperator extends Ope
   // Used by hash distinct aggregations when hashGrpKeyNotRedKey is true
   protected transient HashSet<KeyWrapper> keysCurrentGroup;
 
-  transient boolean bucketGroup;
-
   transient boolean firstRow;
   transient long totalMemory;
   transient boolean hashAggr;
@@ -329,9 +327,8 @@ public class GroupByOperator extends Ope
       objectInspectors.add(roi);
     }
 
-    bucketGroup = conf.getBucketGroup();
     aggregationsParametersLastInvoke = new Object[conf.getAggregators().size()][];
-    if (conf.getMode() != GroupByDesc.Mode.HASH || bucketGroup) {
+    if (conf.getMode() != GroupByDesc.Mode.HASH || conf.getBucketGroup()) {
       aggregations = newAggregations();
       hashAggr = false;
     } else {
@@ -808,7 +805,6 @@ public class GroupByOperator extends Ope
     boolean keysAreEqual = (currentKeys != null && newKeys != null)?
         newKeys.equals(currentKeys) : false;
 
-
     // Forward the current keys if needed for sort-based aggregation
     if (currentKeys != null && !keysAreEqual) {
       forward(currentKeys.getKeyArray(), aggregations);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Sep 28 19:18:10 2012
@@ -103,6 +103,8 @@ public abstract class Operator<T extends
     seqId = 0;
   }
 
+  private boolean useBucketizedHiveInputFormat;
+
   public Operator() {
     id = String.valueOf(seqId++);
   }
@@ -708,6 +710,31 @@ public abstract class Operator<T extends
     }
   }
 
+  // Remove the operators till a certain depth.
+  // Return true if the remove was successful, false otherwise
+  public boolean removeChildren(int depth) {
+    Operator<? extends OperatorDesc> currOp = this;
+    for (int i = 0; i < depth; i++) {
+      // If there are more than 1 children at any level, don't do anything
+      if ((currOp.getChildOperators() == null) ||
+          (currOp.getChildOperators().size() > 1)) {
+        return false;
+      }
+      currOp = currOp.getChildOperators().get(0);
+    }
+
+    setChildOperators(currOp.getChildOperators());
+
+    List<Operator<? extends OperatorDesc>> parentOps =
+      new ArrayList<Operator<? extends OperatorDesc>>();
+    parentOps.add(this);
+
+    for (Operator<? extends OperatorDesc> op : currOp.getChildOperators()) {
+      op.setParentOperators(parentOps);
+    }
+    return true;
+  }
+
   /**
    * Replace one parent with another at the same position. Chilren of the new
    * parent are not updated
@@ -1376,4 +1403,16 @@ public abstract class Operator<T extends
 
     return ret;
   }
+
+  public boolean columnNamesRowResolvedCanBeObtained() {
+    return false;
+  }
+
+  public boolean isUseBucketizedHiveInputFormat() {
+    return useBucketizedHiveInputFormat;
+  }
+
+  public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) {
+    this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Fri Sep 28 19:18:10 2012
@@ -105,4 +105,9 @@ public class SelectOperator extends Oper
   public boolean supportSkewJoinOptimization() {
     return true;
   }
+
+  @Override
+  public boolean columnNamesRowResolvedCanBeObtained() {
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Fri Sep 28 19:18:10 2012
@@ -285,8 +285,8 @@ public final class GenMapRedUtils {
         bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
         bucketMJCxt.setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
         bucketMJCxt.setBigTablePartSpecToFileMapping(
-            currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
-        plan.setSmbJoin(currMapJoinOp instanceof SMBMapJoinOperator);
+          currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
+        plan.setUseBucketizedHiveInputFormat(currMapJoinOp instanceof SMBMapJoinOperator);
       }
     }
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Fri Sep 28 19:18:10 2012
@@ -19,8 +19,9 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -28,11 +29,13 @@ import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -53,15 +56,16 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.util.StringUtils;
 
 /**
- *this transformation does bucket group by optimization.
+ * This transformation does group by optimization. If the grouping key is a superset
+ * of the bucketing and sorting keys of the underlying table in the same order, the
+ * group by can be be performed on the map-side completely.
  */
 public class GroupByOptimizer implements Transform {
 
@@ -75,19 +79,31 @@ public class GroupByOptimizer implements
   public ParseContext transform(ParseContext pctx) throws SemanticException {
 
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    GroupByOptProcCtx groupByOptimizeCtx = new GroupByOptProcCtx();
+    HiveConf conf = pctx.getConf();
 
-    // process group-by pattern
-    opRules.put(new RuleRegExp("R1",
-      GroupByOperator.getOperatorName() + "%"
-      + ReduceSinkOperator.getOperatorName() + "%"
-      + GroupByOperator.getOperatorName() + "%"),
-      getMapAggreSortedGroupbyProc(pctx));
+    if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+      // process group-by pattern
+      opRules.put(new RuleRegExp("R1",
+        GroupByOperator.getOperatorName() + "%" +
+        ReduceSinkOperator.getOperatorName() + "%" +
+        GroupByOperator.getOperatorName() + "%"),
+        getMapSortedGroupbyProc(pctx));
+    } else {
+      // If hive.groupby.skewindata is set to true, the operator tree is as below
+      opRules.put(new RuleRegExp("R2",
+        GroupByOperator.getOperatorName() + "%" +
+        ReduceSinkOperator.getOperatorName() + "%" +
+        GroupByOperator.getOperatorName() + "%" +
+        ReduceSinkOperator.getOperatorName() + "%" +
+        GroupByOperator.getOperatorName() + "%"),
+        getMapSortedGroupbySkewProc(pctx));
+    }
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
-    Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules,
-        groupByOptimizeCtx);
+    Dispatcher disp =
+      new DefaultRuleDispatcher(getDefaultProc(), opRules,
+      new GroupByOptimizerContext(conf));
     GraphWalker ogw = new DefaultGraphWalker(disp);
 
     // Create a list of topop nodes
@@ -102,212 +118,322 @@ public class GroupByOptimizer implements
     return new NodeProcessor() {
       @Override
       public Object process(Node nd, Stack<Node> stack,
-          NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+        NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
         return null;
       }
     };
   }
 
-  private NodeProcessor getMapAggreSortedGroupbyProc(ParseContext pctx) {
-    return new BucketGroupByProcessor(pctx);
+  private NodeProcessor getMapSortedGroupbyProc(ParseContext pctx) {
+    return new SortGroupByProcessor(pctx);
   }
 
+  private NodeProcessor getMapSortedGroupbySkewProc(ParseContext pctx) {
+    return new SortGroupBySkewProcessor(pctx);
+  }
+
+  public enum GroupByOptimizerSortMatch {
+    NO_MATCH, PARTIAL_MATCH, COMPLETE_MATCH
+  };
+
   /**
-   * BucketGroupByProcessor.
+   * SortGroupByProcessor.
    *
    */
-  public class BucketGroupByProcessor implements NodeProcessor {
+  public class SortGroupByProcessor implements NodeProcessor {
 
     protected ParseContext pGraphContext;
 
-    public BucketGroupByProcessor(ParseContext pGraphContext) {
+    public SortGroupByProcessor(ParseContext pGraphContext) {
       this.pGraphContext = pGraphContext;
     }
 
+    // Check if the group by operator has already been processed
+    protected boolean checkGroupByOperatorProcessed(
+      GroupByOptimizerContext groupBySortOptimizerContext,
+      GroupByOperator groupByOp) {
+
+      // The group by operator has already been processed
+      if (groupBySortOptimizerContext.getListGroupByOperatorsProcessed().contains(groupByOp)) {
+        return true;
+      }
+
+      groupBySortOptimizerContext.getListGroupByOperatorsProcessed().add(groupByOp);
+      return false;
+    }
+
+    protected void processGroupBy(GroupByOptimizerContext ctx,
+      Stack<Node> stack,
+      GroupByOperator groupByOp,
+      int depth) throws SemanticException {
+      HiveConf hiveConf = ctx.getConf();
+      GroupByOptimizerSortMatch match = checkSortGroupBy(stack, groupByOp);
+      boolean useMapperSort =
+        HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT);
+
+      if (useMapperSort) {
+        if (match == GroupByOptimizerSortMatch.COMPLETE_MATCH) {
+          convertGroupByMapSideSortedGroupBy(groupByOp, depth);
+        }
+      }
+      else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
+        (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
+        groupByOp.getConf().setBucketGroup(true);
+      }
+    }
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
       // GBY,RS,GBY... (top to bottom)
-      GroupByOperator op = (GroupByOperator) stack.get(stack.size() - 3);
-      checkBucketGroupBy(op);
+      GroupByOperator groupByOp = (GroupByOperator) stack.get(stack.size() - 3);
+
+      GroupByOptimizerContext ctx = (GroupByOptimizerContext)procCtx;
+
+      if (!checkGroupByOperatorProcessed(ctx, groupByOp)) {
+        processGroupBy(ctx, stack, groupByOp, 2);
+      }
       return null;
     }
 
-    private void checkBucketGroupBy(GroupByOperator curr)
-        throws SemanticException {
+    // Should this group by be converted to a map-side group by, because the grouping keys for
+    // the base table for the group by matches the skewed keys
+    protected GroupByOptimizerSortMatch checkSortGroupBy(Stack<Node> stack,
+      GroupByOperator groupByOp)
+      throws SemanticException {
 
       // if this is not a HASH groupby, return
-      if (curr.getConf().getMode() != GroupByDesc.Mode.HASH) {
-        return;
+      if (groupByOp.getConf().getMode() != GroupByDesc.Mode.HASH) {
+        return GroupByOptimizerSortMatch.NO_MATCH;
       }
 
-      Set<String> tblNames = pGraphContext.getGroupOpToInputTables().get(curr);
-      if (tblNames == null || tblNames.size() == 0) {
-        return;
-      }
+      // Check all the operators in the stack. Currently, only SELECTs and FILTERs
+      // are allowed. A interface 'supportMapSideGroupBy has been added for the same
+      Operator<? extends OperatorDesc> currOp = groupByOp;
+      currOp = currOp.getParentOperators().get(0);
+
+      while (true) {
+        if (currOp.getParentOperators() == null) {
+          break;
+        }
 
-      boolean bucketGroupBy = true;
-      GroupByDesc desc = curr.getConf();
-      List<ExprNodeDesc> groupByKeys = new LinkedList<ExprNodeDesc>();
-      groupByKeys.addAll(desc.getKeys());
-      // compute groupby columns from groupby keys
-      List<String> groupByCols = new ArrayList<String>();
-      while (groupByKeys.size() > 0) {
-        ExprNodeDesc node = groupByKeys.remove(0);
-        if (node instanceof ExprNodeColumnDesc) {
-          groupByCols.addAll(node.getCols());
-        } else if ((node instanceof ExprNodeConstantDesc)
-            || (node instanceof ExprNodeNullDesc)) {
-          // nothing
-        } else if (node instanceof ExprNodeFieldDesc) {
-          groupByKeys.add(0, ((ExprNodeFieldDesc) node).getDesc());
-          continue;
-        } else if (node instanceof ExprNodeGenericFuncDesc) {
-          ExprNodeGenericFuncDesc udfNode = ((ExprNodeGenericFuncDesc) node);
-          GenericUDF udf = udfNode.getGenericUDF();
-          if (!FunctionRegistry.isDeterministic(udf)) {
-            return;
-          }
-          groupByKeys.addAll(0, udfNode.getChildExprs());
-        } else {
-          return;
+        if ((currOp.getParentOperators().size() > 1) ||
+            (!currOp.columnNamesRowResolvedCanBeObtained())) {
+          return GroupByOptimizerSortMatch.NO_MATCH;
         }
+
+        currOp = currOp.getParentOperators().get(0);
       }
 
-      if (groupByCols.size() == 0) {
-        return;
+      // currOp now points to the top-most tablescan operator
+      TableScanOperator tableScanOp = (TableScanOperator)currOp;
+      int stackPos = 0;
+      assert stack.get(0) == tableScanOp;
+
+      // Create a mapping from the group by columns to the table columns
+      Map<String, String> tableColsMapping = new HashMap<String, String>();
+      Set<String> constantCols = new HashSet<String>();
+      Table table = pGraphContext.getTopToTable().get(currOp);
+      for (FieldSchema col : table.getAllCols()) {
+        tableColsMapping.put(col.getName(), col.getName());
       }
 
-      for (String table : tblNames) {
-        Operator<? extends OperatorDesc> topOp = pGraphContext.getTopOps().get(
-            table);
-        if (topOp == null || (!(topOp instanceof TableScanOperator))) {
-          // this is in a sub-query.
-          // In future, we need to infer subq's columns propery. For example
-          // "select key, count(1)
-          // from (from clustergroupbyselect key, value where ds='210') group by key, 3;",
-          // even though the group by op is in a subquery, it can be changed to
-          // bucket groupby.
-          return;
-        }
-        TableScanOperator ts = (TableScanOperator) topOp;
-        Table destTable = pGraphContext.getTopToTable().get(ts);
-        if (destTable == null) {
-          return;
-        }
-        if (!destTable.isPartitioned()) {
-          List<String> bucketCols = destTable.getBucketCols();
-          List<String> sortCols = Utilities
-              .getColumnNamesFromSortCols(destTable.getSortCols());
-          bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols,
-              sortCols);
-          if (!bucketGroupBy) {
-            return;
+      while (currOp != groupByOp) {
+        Operator<? extends OperatorDesc> processOp = currOp;
+        Set<String> newConstantCols = new HashSet<String>();
+        currOp = (Operator<? extends OperatorDesc>)(stack.get(++stackPos));
+
+        // Filters don't change the column names - so, no need to do anything for them
+        if (processOp instanceof SelectOperator) {
+          SelectOperator selectOp = (SelectOperator)processOp;
+          SelectDesc selectDesc = selectOp.getConf();
+
+          if (selectDesc.isSelStarNoCompute()) {
+            continue;
           }
-        } else {
-          PrunedPartitionList partsList = null;
-          try {
-            partsList = pGraphContext.getOpToPartList().get(ts);
-            if (partsList == null) {
-              partsList = PartitionPruner.prune(destTable, pGraphContext
-                .getOpToPartPruner().get(ts), pGraphContext.getConf(), table,
-                pGraphContext.getPrunedPartitions());
-              pGraphContext.getOpToPartList().put(ts, partsList);
+
+          // Only columns and constants can be selected
+          for (int pos = 0; pos < selectDesc.getColList().size(); pos++) {
+            String outputColumnName = selectDesc.getOutputColumnNames().get(pos);
+            if (constantCols.contains(outputColumnName)) {
+              tableColsMapping.remove(outputColumnName);
+              newConstantCols.add(outputColumnName);
+              continue;
+            }
+
+            ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+            if (selectColList instanceof ExprNodeColumnDesc) {
+              String newValue =
+                tableColsMapping.get(((ExprNodeColumnDesc) selectColList).getColumn());
+              tableColsMapping.put(outputColumnName, newValue);
             }
-          } catch (HiveException e) {
-            // Has to use full name to make sure it does not conflict with
-            // org.apache.commons.lang.StringUtils
-            LOG.error(org.apache.hadoop.util.StringUtils.stringifyException(e));
-            throw new SemanticException(e.getMessage(), e);
+            else {
+              tableColsMapping.remove(outputColumnName);
+              if ((selectColList instanceof ExprNodeConstantDesc) ||
+                  (selectColList instanceof ExprNodeNullDesc)) {
+                newConstantCols.add(outputColumnName);
+              }
+            }
+          }
+
+          constantCols = newConstantCols;
+        }
+      }
+
+      boolean sortGroupBy = true;
+      // compute groupby columns from groupby keys
+      List<String> groupByCols = new ArrayList<String>();
+      // If the group by expression is anything other than a list of columns,
+      // the sorting property is not obeyed
+      for (ExprNodeDesc expr : groupByOp.getConf().getKeys()) {
+        if (expr instanceof ExprNodeColumnDesc) {
+          String groupByKeyColumn = ((ExprNodeColumnDesc)expr).getColumn();
+          // ignore if it is a constant
+          if (constantCols.contains(groupByKeyColumn)) {
+            continue;
           }
-          List<Partition> parts = new ArrayList<Partition>();
-          parts.addAll(partsList.getConfirmedPartns());
-          parts.addAll(partsList.getUnknownPartns());
-          for (Partition part : parts) {
-            List<String> bucketCols = part.getBucketCols();
-            List<String> sortCols = part.getSortColNames();
-            bucketGroupBy = matchBucketOrSortedColumns(groupByCols, bucketCols,
-                sortCols);
-            if (!bucketGroupBy) {
-              return;
+          else {
+            if (tableColsMapping.containsKey(groupByKeyColumn)) {
+              groupByCols.add(tableColsMapping.get(groupByKeyColumn));
+            }
+            else {
+              return GroupByOptimizerSortMatch.NO_MATCH;
             }
           }
         }
+        // Constants and nulls are OK
+        else if ((expr instanceof ExprNodeConstantDesc) ||
+          (expr instanceof ExprNodeNullDesc)) {
+          continue;
+        } else {
+          return GroupByOptimizerSortMatch.NO_MATCH;
+        }
       }
 
-      curr.getConf().setBucketGroup(bucketGroupBy);
+      if (!table.isPartitioned()) {
+        List<String> sortCols = Utilities.getColumnNamesFromSortCols(table.getSortCols());
+        return matchSortColumns(groupByCols, sortCols);
+      } else {
+        PrunedPartitionList partsList = null;
+        try {
+          partsList = pGraphContext.getOpToPartList().get(tableScanOp);
+          if (partsList == null) {
+            partsList = PartitionPruner.prune(table,
+              pGraphContext.getOpToPartPruner().get(tableScanOp),
+              pGraphContext.getConf(),
+              table.getTableName(),
+              pGraphContext.getPrunedPartitions());
+            pGraphContext.getOpToPartList().put(tableScanOp, partsList);
+          }
+        } catch (HiveException e) {
+          LOG.error(StringUtils.stringifyException(e));
+          throw new SemanticException(e.getMessage(), e);
+        }
+
+        GroupByOptimizerSortMatch currentMatch = GroupByOptimizerSortMatch.COMPLETE_MATCH;
+        for (Partition part : partsList.getNotDeniedPartns()) {
+          List<String> sortCols = part.getSortColNames();
+          GroupByOptimizerSortMatch match = matchSortColumns(groupByCols, sortCols);
+          if (match == GroupByOptimizerSortMatch.NO_MATCH) {
+            return match;
+          }
+
+          if (match == GroupByOptimizerSortMatch.PARTIAL_MATCH) {
+            currentMatch = match;
+          }
+        }
+        return currentMatch;
+      }
     }
 
     /**
-     * Given the group by keys, bucket columns, sort column, this method
+     * Given the group by keys, sort columns, this method
      * determines if we can use sorted group by or not.
-     *
-     * We use bucket columns only when the sorted column set is empty and if all
-     * group by columns are contained in bucket columns.
-     *
-     * If we can can not determine by looking at bucketed columns and the table
-     * has sort columns, we resort to sort columns. We can use bucket group by
-     * if the groupby column set is an exact prefix match of sort columns.
+     * We can use map-side sort group by group by columns match the sorted columns
+     * in exactly the same order.
      *
      * @param groupByCols
-     * @param bucketCols
      * @param sortCols
      * @return
      * @throws SemanticException
      */
-    private boolean matchBucketOrSortedColumns(List<String> groupByCols,
-        List<String> bucketCols, List<String> sortCols) throws SemanticException {
-      boolean ret = false;
+    private GroupByOptimizerSortMatch matchSortColumns(
+      List<String> groupByCols,
+      List<String> sortCols) throws SemanticException {
 
       if (sortCols == null || sortCols.size() == 0) {
-        ret = matchBucketColumns(groupByCols, bucketCols);
+        return GroupByOptimizerSortMatch.NO_MATCH;
       }
 
-      if (!ret && sortCols != null && sortCols.size() >= groupByCols.size()) {
-        // check sort columns, if groupByCols is a prefix subset of sort
-        // columns, we will use sorted group by. For example, if data is sorted
-        // by column a, b, c, and a query wants to group by b,a, we will use
-        // sorted group by. But if the query wants to groupby b,c, then sorted
-        // group by can not be used.
-        int num = groupByCols.size();
-        for (int i = 0; i < num; i++) {
-          if (sortCols.indexOf(groupByCols.get(i)) > (num - 1)) {
-            return false;
-          }
+      int num = sortCols.size() <  groupByCols.size() ? sortCols.size() : groupByCols.size();
+      for (int i = 0; i < num; i++) {
+        if (!sortCols.get(i).equals(groupByCols.get(i))) {
+          return GroupByOptimizerSortMatch.NO_MATCH;
         }
-        return true;
       }
 
-      return ret;
+      return sortCols.size() == groupByCols.size() ?
+        GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
     }
 
-    /*
-     * All group by columns should be contained in the bucket column set. And
-     * the number of group by columns should be equal to number of bucket
-     * columns.
-     */
-    private boolean matchBucketColumns(List<String> grpCols,
-        List<String> tblBucketCols) throws SemanticException {
-
-      if (tblBucketCols == null || tblBucketCols.size() == 0
-          || grpCols.size() == 0 || grpCols.size() != tblBucketCols.size()) {
-        return false;
-      }
-
-      for (int i = 0; i < grpCols.size(); i++) {
-        String tblCol = grpCols.get(i);
-        if (!tblBucketCols.contains(tblCol)) {
-          return false;
-        }
+    // Convert the group by to a map-side group by
+    // The operators specified by depth and removed from the tree.
+    protected void convertGroupByMapSideSortedGroupBy(GroupByOperator groupByOp, int depth) {
+      if (groupByOp.removeChildren(depth)) {
+        // Use bucketized hive input format - that makes sure that one mapper reads the entire file
+        groupByOp.setUseBucketizedHiveInputFormat(true);
+        groupByOp.getConf().setMode(GroupByDesc.Mode.FINAL);
       }
-      return true;
     }
   }
 
   /**
-   * GroupByOptProcCtx.
+   * SortGroupByProcessor.
    *
    */
-  public class GroupByOptProcCtx implements NodeProcessorCtx {
+  public class SortGroupBySkewProcessor extends SortGroupByProcessor {
+    public SortGroupBySkewProcessor(ParseContext pGraphContext) {
+      super(pGraphContext);
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      // GBY,RS,GBY,RS,GBY... (top to bottom)
+      GroupByOperator groupByOp = (GroupByOperator) stack.get(stack.size() - 5);
+      GroupByOptimizerContext ctx = (GroupByOptimizerContext)procCtx;
+
+      if (!checkGroupByOperatorProcessed(ctx, groupByOp)) {
+        processGroupBy(ctx, stack, groupByOp, 4);
+      }
+      return null;
+    }
+  }
+
+  public class GroupByOptimizerContext implements NodeProcessorCtx {
+    List<GroupByOperator> listGroupByOperatorsProcessed;
+    HiveConf conf;
+
+    public GroupByOptimizerContext(HiveConf conf) {
+      this.conf = conf;
+      listGroupByOperatorsProcessed = new ArrayList<GroupByOperator>();
+    }
+
+    public List<GroupByOperator> getListGroupByOperatorsProcessed() {
+      return listGroupByOperatorsProcessed;
+    }
+
+    public void setListGroupByOperatorsProcessed(
+      List<GroupByOperator> listGroupByOperatorsProcessed) {
+      this.listGroupByOperatorsProcessed = listGroupByOperatorsProcessed;
+    }
+
+    public HiveConf getConf() {
+      return conf;
+    }
+
+    public void setConf(HiveConf conf) {
+      this.conf = conf;
+    }
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Sep 28 19:18:10 2012
@@ -63,7 +63,8 @@ public class Optimizer {
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGBYUSINGINDEX)) {
       transformations.add(new RewriteGBUsingIndex());
     }
-    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGROUPBY)) {
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGROUPBY) ||
+        HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT)) {
       transformations.add(new GroupByOptimizer());
     }
     transformations.add(new SamplePruner());

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Sep 28 19:18:10 2012
@@ -7263,6 +7263,12 @@ public class SemanticAnalyzer extends Ba
       setKeyDescTaskTree(rootTask);
     }
 
+    // If a task contains an operator which instructs bucketizedhiveinputformat
+    // to be used, please do so
+    for (Task<? extends Serializable> rootTask : rootTasks) {
+      setInputFormat(rootTask);
+    }
+
     PhysicalContext physicalContext = new PhysicalContext(conf,
         getParseContext(), ctx, rootTasks, fetchTask);
     PhysicalOptimizer physicalOptimizer = new PhysicalOptimizer(
@@ -7443,6 +7449,43 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
+  private void setInputFormat(MapredWork work, Operator<? extends OperatorDesc> op) {
+    if (op.isUseBucketizedHiveInputFormat()) {
+      work.setUseBucketizedHiveInputFormat(true);
+      return;
+    }
+
+    if (op.getChildOperators() != null) {
+      for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
+        setInputFormat(work, childOp);
+      }
+    }
+  }
+
+  // loop over all the tasks recursviely
+  private void setInputFormat(Task<? extends Serializable> task) {
+    if (task instanceof ExecDriver) {
+      MapredWork work = (MapredWork) task.getWork();
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = work.getAliasToWork();
+      if (!opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          setInputFormat(work, op);
+        }
+      }
+    } else if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task).getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks) {
+        setInputFormat(tsk);
+      }
+    }
+
+    if (task.getChildTasks() != null) {
+      for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+        setInputFormat(childTask);
+      }
+    }
+  }
+
   // loop over all the tasks recursviely
   private void setKeyDescTaskTree(Task<? extends Serializable> task) {
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/GroupByDesc.java Fri Sep 28 19:18:10 2012
@@ -52,6 +52,8 @@ public class GroupByDesc extends Abstrac
 
   private Mode mode;
   private boolean groupKeyNotReductionKey;
+
+  // no hash aggregations for group by
   private boolean bucketGroup;
 
   private ArrayList<ExprNodeDesc> keys;
@@ -177,8 +179,8 @@ public class GroupByDesc extends Abstrac
     return bucketGroup;
   }
 
-  public void setBucketGroup(boolean dataSorted) {
-    bucketGroup = dataSorted;
+  public void setBucketGroup(boolean bucketGroup) {
+    this.bucketGroup = bucketGroup;
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Fri Sep 28 19:18:10 2012
@@ -89,7 +89,7 @@ public class MapredWork extends Abstract
   // used to indicate the input is sorted, and so a BinarySearchRecordReader shoudl be used
   private boolean inputFormatSorted = false;
 
-  private transient boolean smbJoin;
+  private transient boolean useBucketizedHiveInputFormat;
 
   public MapredWork() {
     aliasToPartnInfo = new LinkedHashMap<String, PartitionDesc>();
@@ -488,11 +488,11 @@ public class MapredWork extends Abstract
     return returnList;
   }
 
-  public boolean isSmbJoin() {
-    return smbJoin;
+  public boolean isUseBucketizedHiveInputFormat() {
+    return useBucketizedHiveInputFormat;
   }
 
-  public void setSmbJoin(boolean smbJoin) {
-    this.smbJoin = smbJoin;
+  public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) {
+    this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat;
   }
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q?rev=1391608&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_1.q Fri Sep 28 19:18:10 2012
@@ -0,0 +1,282 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 10;
+set hive.map.groupby.sorted=true;
+
+CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 select key, val from T1;
+
+CREATE TABLE outputTbl1(key int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key
+-- matches the skewed key
+-- addind a order by at the end to make the test results deterministic
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+CREATE TABLE outputTbl2(key1 int, key2 string, cnt int);
+
+-- no map-side group by even if the group by key is a superset of skewed key
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl2
+SELECT key, val, count(1) FROM T1 GROUP BY key, val;
+
+INSERT OVERWRITE TABLE outputTbl2
+SELECT key, val, count(1) FROM T1 GROUP BY key, val;
+
+SELECT * FROM outputTbl2 ORDER BY key1, key2;
+
+-- It should work for sub-queries
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM (SELECT key, val FROM T1) subq1 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM (SELECT key, val FROM T1) subq1 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- It should work for sub-queries with column aliases
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT k, count(1) FROM (SELECT key as k, val as v FROM T1) subq1 GROUP BY k;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT k, count(1) FROM (SELECT key as k, val as v FROM T1) subq1 GROUP BY k;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+CREATE TABLE outputTbl3(key1 int, key2 int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant followed
+-- by a match to the skewed key
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl3
+SELECT 1, key, count(1) FROM T1 GROUP BY 1, key;
+
+INSERT OVERWRITE TABLE outputTbl3
+SELECT 1, key, count(1) FROM T1 GROUP BY 1, key;
+
+SELECT * FROM outputTbl3 ORDER BY key1, key2;
+
+CREATE TABLE outputTbl4(key1 int, key2 int, key3 string, cnt int);
+
+-- no map-side group by if the group by key contains a constant followed by another column
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T1 GROUP BY key, 1, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T1 GROUP BY key, 1, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+-- no map-side group by if the group by key contains a function
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl3
+SELECT key, key + 1, count(1) FROM T1 GROUP BY key, key + 1;
+
+INSERT OVERWRITE TABLE outputTbl3
+SELECT key, key + 1, count(1) FROM T1 GROUP BY key, key + 1;
+
+SELECT * FROM outputTbl3 ORDER BY key1, key2;
+
+-- it should not matter what follows the group by
+-- test various cases
+
+-- group by followed by another group by
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key + key, sum(cnt) from
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+group by key + key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key + key, sum(cnt) from
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+group by key + key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a union
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+  UNION ALL
+SELECT key, count(1) FROM T1 GROUP BY key
+) subq1;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+  UNION ALL
+SELECT key, count(1) FROM T1 GROUP BY key
+) subq1;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a union where one of the sub-queries is map-side group by
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+  UNION ALL
+SELECT key + key as key, count(1) FROM T1 GROUP BY key + key
+) subq1;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) as cnt FROM T1 GROUP BY key
+  UNION ALL
+SELECT key + key as key, count(1) as cnt FROM T1 GROUP BY key + key
+) subq1;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a join
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT subq1.key, subq1.cnt+subq2.cnt FROM 
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq2
+ON subq1.key = subq2.key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT subq1.key, subq1.cnt+subq2.cnt FROM 
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq2
+ON subq1.key = subq2.key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a join where one of the sub-queries can be performed in the mapper
+EXPLAIN EXTENDED 
+SELECT * FROM 
+(SELECT key, count(1) FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, val, count(1) FROM T1 GROUP BY key, val) subq2
+ON subq1.key = subq2.key;
+
+CREATE TABLE T2(key STRING, val STRING)
+CLUSTERED BY (key, val) SORTED BY (key, val) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T2 select key, val from T1;
+
+-- no mapside sort group by if the group by is a prefix of the sorted key
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T2 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T2 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant in between the
+-- skewed keys
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T2 GROUP BY key, 1, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T2 GROUP BY key, 1, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+CREATE TABLE outputTbl5(key1 int, key2 int, key3 string, key4 int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant in between the
+-- skewed keys followed by anything
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl5
+SELECT key, 1, val, 2, count(1) FROM T2 GROUP BY key, 1, val, 2;
+
+INSERT OVERWRITE TABLE outputTbl5
+SELECT key, 1, val, 2, count(1) FROM T2 GROUP BY key, 1, val, 2;
+
+SELECT * FROM outputTbl5 
+ORDER BY key1, key2, key3, key4;
+
+-- contants from sub-queries should work fine
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, constant, val, count(1) from 
+(SELECT key, 1 as constant, val from T2)subq
+group by key, constant, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, constant, val, count(1) from 
+(SELECT key, 1 as constant, val from T2)subq
+group by key, constant, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+-- multiple levels of contants from sub-queries should work fine
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+select key, constant3, val, count(1) from
+(
+SELECT key, constant as constant2, val, 2 as constant3 from 
+(SELECT key, 1 as constant, val from T2)subq
+)subq2
+group by key, constant3, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+select key, constant3, val, count(1) from
+(
+SELECT key, constant as constant2, val, 2 as constant3 from 
+(SELECT key, 1 as constant, val from T2)subq
+)subq2
+group by key, constant3, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+set hive.map.aggr=true;
+set hive.multigroupby.singlereducer=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE DEST1(key INT, cnt INT);
+CREATE TABLE DEST2(key INT, val STRING, cnt INT);
+
+SET hive.exec.compress.intermediate=true;
+SET hive.exec.compress.output=true; 
+
+EXPLAIN
+FROM T2
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+FROM T2
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+select * from DEST1 ORDER BY key, cnt;
+select * from DEST2 ORDER BY key, val, val;
+
+-- multi-table insert with a sub-query
+EXPLAIN
+FROM (select key, val from T2 where key = 8) x
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+FROM (select key, val from T2 where key = 8) x
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+select * from DEST1 ORDER BY key, cnt;
+select * from DEST2 ORDER BY key, val, cnt;

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q?rev=1391608&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_sort_skew_1.q Fri Sep 28 19:18:10 2012
@@ -0,0 +1,283 @@
+set hive.enforce.bucketing = true;
+set hive.enforce.sorting = true;
+set hive.exec.reducers.max = 10;
+set hive.map.groupby.sorted=true;
+set hive.groupby.skewindata=true;
+
+CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T1 select key, val from T1;
+
+CREATE TABLE outputTbl1(key int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key
+-- matches the skewed key
+-- addind a order by at the end to make the test results deterministic
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T1 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+CREATE TABLE outputTbl2(key1 int, key2 string, cnt int);
+
+-- no map-side group by even if the group by key is a superset of skewed key
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl2
+SELECT key, val, count(1) FROM T1 GROUP BY key, val;
+
+INSERT OVERWRITE TABLE outputTbl2
+SELECT key, val, count(1) FROM T1 GROUP BY key, val;
+
+SELECT * FROM outputTbl2 ORDER BY key1, key2;
+
+-- It should work for sub-queries
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM (SELECT key, val FROM T1) subq1 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM (SELECT key, val FROM T1) subq1 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- It should work for sub-queries with column aliases
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT k, count(1) FROM (SELECT key as k, val as v FROM T1) subq1 GROUP BY k;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT k, count(1) FROM (SELECT key as k, val as v FROM T1) subq1 GROUP BY k;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+CREATE TABLE outputTbl3(key1 int, key2 int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant followed
+-- by a match to the skewed key
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl3
+SELECT 1, key, count(1) FROM T1 GROUP BY 1, key;
+
+INSERT OVERWRITE TABLE outputTbl3
+SELECT 1, key, count(1) FROM T1 GROUP BY 1, key;
+
+SELECT * FROM outputTbl3 ORDER BY key1, key2;
+
+CREATE TABLE outputTbl4(key1 int, key2 int, key3 string, cnt int);
+
+-- no map-side group by if the group by key contains a constant followed by another column
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T1 GROUP BY key, 1, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T1 GROUP BY key, 1, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+-- no map-side group by if the group by key contains a function
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl3
+SELECT key, key + 1, count(1) FROM T1 GROUP BY key, key + 1;
+
+INSERT OVERWRITE TABLE outputTbl3
+SELECT key, key + 1, count(1) FROM T1 GROUP BY key, key + 1;
+
+SELECT * FROM outputTbl3 ORDER BY key1, key2;
+
+-- it should not matter what follows the group by
+-- test various cases
+
+-- group by followed by another group by
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key + key, sum(cnt) from
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+group by key + key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key + key, sum(cnt) from
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+group by key + key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a union
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+  UNION ALL
+SELECT key, count(1) FROM T1 GROUP BY key
+) subq1;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+  UNION ALL
+SELECT key, count(1) FROM T1 GROUP BY key
+) subq1;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a union where one of the sub-queries is map-side group by
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) FROM T1 GROUP BY key
+  UNION ALL
+SELECT key + key as key, count(1) FROM T1 GROUP BY key + key
+) subq1;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT * FROM (
+SELECT key, count(1) as cnt FROM T1 GROUP BY key
+  UNION ALL
+SELECT key + key as key, count(1) as cnt FROM T1 GROUP BY key + key
+) subq1;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a join
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT subq1.key, subq1.cnt+subq2.cnt FROM 
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq2
+ON subq1.key = subq2.key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT subq1.key, subq1.cnt+subq2.cnt FROM 
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, count(1) as cnt FROM T1 GROUP BY key) subq2
+ON subq1.key = subq2.key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- group by followed by a join where one of the sub-queries can be performed in the mapper
+EXPLAIN EXTENDED 
+SELECT * FROM 
+(SELECT key, count(1) FROM T1 GROUP BY key) subq1
+JOIN
+(SELECT key, val, count(1) FROM T1 GROUP BY key, val) subq2
+ON subq1.key = subq2.key;
+
+CREATE TABLE T2(key STRING, val STRING)
+CLUSTERED BY (key, val) SORTED BY (key, val) INTO 2 BUCKETS STORED AS TEXTFILE;
+
+-- perform an insert to make sure there are 2 files
+INSERT OVERWRITE TABLE T2 select key, val from T1;
+
+-- no mapside sort group by if the group by is a prefix of the sorted key
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T2 GROUP BY key;
+
+INSERT OVERWRITE TABLE outputTbl1
+SELECT key, count(1) FROM T2 GROUP BY key;
+
+SELECT * FROM outputTbl1 ORDER BY key;
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant in between the
+-- skewed keys
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T2 GROUP BY key, 1, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, 1, val, count(1) FROM T2 GROUP BY key, 1, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+CREATE TABLE outputTbl5(key1 int, key2 int, key3 string, key4 int, cnt int);
+
+-- The plan should be converted to a map-side group by if the group by key contains a constant in between the
+-- skewed keys followed by anything
+EXPLAIN EXTENDED 
+INSERT OVERWRITE TABLE outputTbl5
+SELECT key, 1, val, 2, count(1) FROM T2 GROUP BY key, 1, val, 2;
+
+INSERT OVERWRITE TABLE outputTbl5
+SELECT key, 1, val, 2, count(1) FROM T2 GROUP BY key, 1, val, 2;
+
+SELECT * FROM outputTbl5 
+ORDER BY key1, key2, key3, key4;
+
+-- contants from sub-queries should work fine
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, constant, val, count(1) from 
+(SELECT key, 1 as constant, val from T2)subq
+group by key, constant, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+SELECT key, constant, val, count(1) from 
+(SELECT key, 1 as constant, val from T2)subq
+group by key, constant, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+-- multiple levels of contants from sub-queries should work fine
+EXPLAIN EXTENDED
+INSERT OVERWRITE TABLE outputTbl4
+select key, constant3, val, count(1) from
+(
+SELECT key, constant as constant2, val, 2 as constant3 from 
+(SELECT key, 1 as constant, val from T2)subq
+)subq2
+group by key, constant3, val;
+
+INSERT OVERWRITE TABLE outputTbl4
+select key, constant3, val, count(1) from
+(
+SELECT key, constant as constant2, val, 2 as constant3 from 
+(SELECT key, 1 as constant, val from T2)subq
+)subq2
+group by key, constant3, val;
+
+SELECT * FROM outputTbl4 ORDER BY key1, key2, key3;
+
+set hive.map.aggr=true;
+set hive.multigroupby.singlereducer=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE DEST1(key INT, cnt INT);
+CREATE TABLE DEST2(key INT, val STRING, cnt INT);
+
+SET hive.exec.compress.intermediate=true;
+SET hive.exec.compress.output=true; 
+
+EXPLAIN
+FROM T2
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+FROM T2
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+select * from DEST1 ORDER BY key, cnt;
+select * from DEST2 ORDER BY key, val, val;
+
+-- multi-table insert with a sub-query
+EXPLAIN
+FROM (select key, val from T2 where key = 8) x
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+FROM (select key, val from T2 where key = 8) x
+INSERT OVERWRITE TABLE DEST1 SELECT key, count(1) GROUP BY key
+INSERT OVERWRITE TABLE DEST2 SELECT key, val, count(1) GROUP BY key, val;
+
+select * from DEST1 ORDER BY key, cnt;
+select * from DEST2 ORDER BY key, val, cnt;

Modified: hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out?rev=1391608&r1=1391607&r2=1391608&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/bucket_groupby.q.out Fri Sep 28 19:18:10 2012
@@ -60,7 +60,7 @@ STAGE PLANS:
               Group By Operator
                 aggregations:
                       expr: count(1)
-                bucketGroup: true
+                bucketGroup: false
                 keys:
                       expr: key
                       type: string
@@ -185,7 +185,7 @@ STAGE PLANS:
               Group By Operator
                 aggregations:
                       expr: count(1)
-                bucketGroup: true
+                bucketGroup: false
                 keys:
                       expr: key
                       type: string
@@ -289,7 +289,7 @@ STAGE PLANS:
               Group By Operator
                 aggregations:
                       expr: count(1)
-                bucketGroup: true
+                bucketGroup: false
                 keys:
                       expr: length(key)
                       type: int
@@ -384,7 +384,7 @@ STAGE PLANS:
               Group By Operator
                 aggregations:
                       expr: count(1)
-                bucketGroup: true
+                bucketGroup: false
                 keys:
                       expr: abs(length(key))
                       type: int
@@ -481,7 +481,7 @@ STAGE PLANS:
               Group By Operator
                 aggregations:
                       expr: count(1)
-                bucketGroup: true
+                bucketGroup: false
                 keys:
                       expr: key
                       type: string
@@ -700,7 +700,7 @@ STAGE PLANS:
               Group By Operator
                 aggregations:
                       expr: count(1)
-                bucketGroup: true
+                bucketGroup: false
                 keys:
                       expr: key
                       type: string
@@ -1102,7 +1102,7 @@ STAGE PLANS:
               Group By Operator
                 aggregations:
                       expr: count(1)
-                bucketGroup: true
+                bucketGroup: false
                 keys:
                       expr: key
                       type: string