You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2012/01/03 19:10:36 UTC

svn commit: r1226903 [1/5] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/apache/hadoop/hive/ql/plan/ ql/src/test/queries/clientpositive/ ql/src/test/results/clientpositive/

Author: heyongqiang
Date: Tue Jan  3 18:10:34 2012
New Revision: 1226903

URL: http://svn.apache.org/viewvc?rev=1226903&view=rev
Log:
HIVE-2621:Allow multiple group bys with the same input data and spray keys to be run on the same reducer. (Kevin via He Yongqiang)

Added:
    hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q
    hive/trunk/ql/src/test/results/clientpositive/groupby7_map_multi_single_reducer.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby7_noskew_multi_single_reducer.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_complex_types_multi_single_reducer.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby_multi_single_reducer.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java
    hive/trunk/ql/src/test/queries/clientpositive/groupby10.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby8.q
    hive/trunk/ql/src/test/queries/clientpositive/groupby9.q
    hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q
    hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby8.q.out
    hive/trunk/ql/src/test/results/clientpositive/groupby9.q.out
    hive/trunk/ql/src/test/results/clientpositive/multi_insert.q.out
    hive/trunk/ql/src/test/results/clientpositive/multigroupby_singlemr.q.out
    hive/trunk/ql/src/test/results/clientpositive/parallel.q.out

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Jan  3 18:10:34 2012
@@ -350,7 +350,7 @@ public class HiveConf extends Configurat
     HIVEMAPJOINFOLLOWEDBYMAPAGGRHASHMEMORY("hive.mapjoin.followby.map.aggr.hash.percentmemory", (float) 0.3),
     HIVEMAPAGGRMEMORYTHRESHOLD("hive.map.aggr.hash.force.flush.memory.threshold", (float) 0.9),
     HIVEMAPAGGRHASHMINREDUCTION("hive.map.aggr.hash.min.reduction", (float) 0.5),
-    HIVEMULTIGROUPBYSINGLEMR("hive.multigroupby.singlemr", false),
+    HIVEMULTIGROUPBYSINGLEREDUCER("hive.multigroupby.singlereducer", true),
 
     // for hive udtf operator
     HIVEUDTFAUTOPROGRESS("hive.udtf.auto.progress", false),

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Jan  3 18:10:34 2012
@@ -33,6 +33,7 @@ import java.util.TreeSet;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
+import org.antlr.runtime.tree.Tree;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.Path;
@@ -48,6 +49,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryProperties;
 import org.apache.hadoop.hive.ql.exec.AbstractMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.ExecDriver;
@@ -58,7 +60,6 @@ import org.apache.hadoop.hive.ql.exec.Fu
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapRedTask;
-import org.apache.hadoop.hive.ql.exec.ArchiveUtils;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.RecordReader;
@@ -153,6 +154,7 @@ import org.apache.hadoop.hive.ql.session
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -170,7 +172,6 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.InputFormat;
-import org.antlr.runtime.tree.Tree;
 
 /**
  * Implementation of the semantic analyzer.
@@ -2892,12 +2893,62 @@ public class SemanticAnalyzer extends Ba
     RowResolver reduceSinkOutputRowResolver = new RowResolver();
     reduceSinkOutputRowResolver.setIsExprResolver(true);
     Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-    ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
     // Pre-compute group-by keys and store in reduceKeys
 
     List<String> outputKeyColumnNames = new ArrayList<String>();
     List<String> outputValueColumnNames = new ArrayList<String>();
     List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
+
+    ArrayList<ExprNodeDesc> reduceKeys = getReduceKeysForReduceSink(grpByExprs, dest,
+        reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
+        colExprMap);
+
+    List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
+        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+
+    ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
+    HashMap<String, ASTNode> aggregationTrees = parseInfo
+        .getAggregationExprsForClause(dest);
+
+    if (!mapAggrDone) {
+      getReduceValuesForReduceSinkNoMapAgg(parseInfo, dest, reduceSinkInputRowResolver,
+          reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues);
+    } else {
+      // Put partial aggregation results in reduceValues
+      int inputField = reduceKeys.size();
+
+      for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
+
+        TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get(
+            inputField).getType();
+        reduceValues.add(new ExprNodeColumnDesc(type,
+            getColumnInternalName(inputField), "", false));
+        inputField++;
+        outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
+        String field = Utilities.ReduceField.VALUE.toString() + "."
+            + getColumnInternalName(reduceValues.size() - 1);
+        reduceSinkOutputRowResolver.putExpression(entry.getValue(),
+            new ColumnInfo(field, type, null, false));
+      }
+    }
+
+    ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
+        OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
+        grpByExprs.size(), reduceValues, distinctColIndices,
+        outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields,
+        numReducers), new RowSchema(reduceSinkOutputRowResolver
+        .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver);
+    rsOp.setColumnExprMap(colExprMap);
+    return rsOp;
+  }
+
+  private ArrayList<ExprNodeDesc> getReduceKeysForReduceSink(List<ASTNode> grpByExprs, String dest,
+      RowResolver reduceSinkInputRowResolver, RowResolver reduceSinkOutputRowResolver,
+      List<String> outputKeyColumnNames, Map<String, ExprNodeDesc> colExprMap)
+      throws SemanticException {
+
+    ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
+
     for (int i = 0; i < grpByExprs.size(); ++i) {
       ASTNode grpbyExpr = grpByExprs.get(i);
       ExprNodeDesc inputExpr = genExprNodeDesc(grpbyExpr,
@@ -2917,7 +2968,16 @@ public class SemanticAnalyzer extends Ba
       }
     }
 
+    return reduceKeys;
+  }
+
+  private List<List<Integer>> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo, String dest,
+      ArrayList<ExprNodeDesc> reduceKeys, RowResolver reduceSinkInputRowResolver,
+      RowResolver reduceSinkOutputRowResolver, List<String> outputKeyColumnNames)
+      throws SemanticException {
+
     List<List<Integer>> distinctColIndices = new ArrayList<List<Integer>>();
+
     // If there is a distinctFuncExp, add all parameters to the reduceKeys.
     if (!parseInfo.getDistinctFuncExprsForClause(dest).isEmpty()) {
       List<ASTNode> distFuncs = parseInfo.getDistinctFuncExprsForClause(dest);
@@ -2957,17 +3017,80 @@ public class SemanticAnalyzer extends Ba
       }
     }
 
-    ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
+    return distinctColIndices;
+  }
+
+  private void getReduceValuesForReduceSinkNoMapAgg(QBParseInfo parseInfo, String dest,
+      RowResolver reduceSinkInputRowResolver, RowResolver reduceSinkOutputRowResolver,
+      List<String> outputValueColumnNames, ArrayList<ExprNodeDesc> reduceValues)
+      throws SemanticException {
     HashMap<String, ASTNode> aggregationTrees = parseInfo
         .getAggregationExprsForClause(dest);
 
-    if (!mapAggrDone) {
-      // Put parameters to aggregations in reduceValues
-      for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
-        ASTNode value = entry.getValue();
-        // 0 is function name
-        for (int i = 1; i < value.getChildCount(); i++) {
-          ASTNode parameter = (ASTNode) value.getChild(i);
+    // Put parameters to aggregations in reduceValues
+    for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
+      ASTNode value = entry.getValue();
+      // 0 is function name
+      for (int i = 1; i < value.getChildCount(); i++) {
+        ASTNode parameter = (ASTNode) value.getChild(i);
+        if (reduceSinkOutputRowResolver.getExpression(parameter) == null) {
+          reduceValues.add(genExprNodeDesc(parameter,
+              reduceSinkInputRowResolver));
+          outputValueColumnNames
+              .add(getColumnInternalName(reduceValues.size() - 1));
+          String field = Utilities.ReduceField.VALUE.toString() + "."
+              + getColumnInternalName(reduceValues.size() - 1);
+          reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field,
+              reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null,
+              false));
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("nls")
+  private Operator genCommonGroupByPlanReduceSinkOperator(QB qb, List<String> dests,
+      Operator inputOperatorInfo) throws SemanticException {
+
+    RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo)
+        .getRowResolver();
+    QBParseInfo parseInfo = qb.getParseInfo();
+    RowResolver reduceSinkOutputRowResolver = new RowResolver();
+    reduceSinkOutputRowResolver.setIsExprResolver(true);
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+
+    // The group by keys and distinct keys should be the same for all dests, so using the first
+    // one to produce these will be the same as using any other.
+    String dest = dests.get(0);
+
+    // Pre-compute group-by keys and store in reduceKeys
+    List<String> outputKeyColumnNames = new ArrayList<String>();
+    List<String> outputValueColumnNames = new ArrayList<String>();
+    List<ASTNode> grpByExprs = getGroupByForClause(parseInfo, dest);
+
+    ArrayList<ExprNodeDesc> reduceKeys = getReduceKeysForReduceSink(grpByExprs, dest,
+        reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
+        colExprMap);
+
+    List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
+        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+
+    ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
+
+    // The dests can have different non-distinct aggregations, so we have to iterate over all of
+    // them
+    for (String destination : dests) {
+
+      getReduceValuesForReduceSinkNoMapAgg(parseInfo, dest, reduceSinkInputRowResolver,
+          reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues);
+
+      // Need to pass all of the columns used in the where clauses as reduce values
+      ASTNode whereClause = parseInfo.getWhrForClause(destination);
+      if (whereClause != null) {
+        List<ASTNode> columnExprs =
+            getColumnExprsFromASTNode(whereClause, reduceSinkInputRowResolver);
+        for (int i = 0; i < columnExprs.size(); i++) {
+          ASTNode parameter = columnExprs.get(i);
           if (reduceSinkOutputRowResolver.getExpression(parameter) == null) {
             reduceValues.add(genExprNodeDesc(parameter,
                 reduceSinkInputRowResolver));
@@ -2981,36 +3104,47 @@ public class SemanticAnalyzer extends Ba
           }
         }
       }
-    } else {
-      // Put partial aggregation results in reduceValues
-      int inputField = reduceKeys.size();
-
-      for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
-
-        TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get(
-            inputField).getType();
-        reduceValues.add(new ExprNodeColumnDesc(type,
-            getColumnInternalName(inputField), "", false));
-        inputField++;
-        outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
-        String field = Utilities.ReduceField.VALUE.toString() + "."
-            + getColumnInternalName(reduceValues.size() - 1);
-        reduceSinkOutputRowResolver.putExpression(entry.getValue(),
-            new ColumnInfo(field, type, null, false));
-      }
     }
 
     ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
         OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys,
         grpByExprs.size(), reduceValues, distinctColIndices,
-        outputKeyColumnNames, outputValueColumnNames, true, -1, numPartitionFields,
-        numReducers), new RowSchema(reduceSinkOutputRowResolver
+        outputKeyColumnNames, outputValueColumnNames, true, -1, grpByExprs.size(),
+        -1), new RowSchema(reduceSinkOutputRowResolver
         .getColumnInfos()), inputOperatorInfo), reduceSinkOutputRowResolver);
     rsOp.setColumnExprMap(colExprMap);
     return rsOp;
   }
 
   /**
+   * Given an ASTNode, it returns all of the descendant ASTNodes which represent column expressions
+   *
+   * @param node
+   * @param inputRR
+   * @return
+   * @throws SemanticException
+   */
+  private List<ASTNode> getColumnExprsFromASTNode(ASTNode node, RowResolver inputRR)
+      throws SemanticException {
+
+    List<ASTNode> nodes = new ArrayList<ASTNode>();
+    if (node.getChildCount() == 0) {
+      return nodes;
+    }
+    for (int i = 0; i < node.getChildCount(); i++) {
+      ASTNode child = (ASTNode)node.getChild(i);
+      if (child.getType() == HiveParser.TOK_TABLE_OR_COL && child.getChild(0) != null &&
+          inputRR.get(null,
+                  BaseSemanticAnalyzer.unescapeIdentifier(child.getChild(0).getText())) != null) {
+        nodes.add(child);
+      } else {
+        nodes.addAll(getColumnExprsFromASTNode(child, inputRR));
+      }
+    }
+    return nodes;
+  }
+
+  /**
    * Generate the second ReduceSinkOperator for the Group By Plan
    * (parseInfo.getXXX(dest)). The new ReduceSinkOperator will be a child of
    * groupByOperatorInfo.
@@ -3223,6 +3357,99 @@ public class SemanticAnalyzer extends Ba
     return groupByOperatorInfo;
   }
 
+  @SuppressWarnings({"nls"})
+  private Operator genGroupByPlan1MRMultiReduceGB(List<String> dests, QB qb, Operator input)
+      throws SemanticException {
+
+    QBParseInfo parseInfo = qb.getParseInfo();
+
+    ExprNodeDesc previous = null;
+    Operator selectInput = input;
+
+    // In order to facilitate partition pruning, or the where clauses together and put them at the
+    // top of the operator tree, this could also reduce the amount of data going to the reducer
+    List<ExprNodeDesc.ExprNodeDescEqualityWrapper> whereExpressions =
+        new ArrayList<ExprNodeDesc.ExprNodeDescEqualityWrapper>();
+    for (String dest : dests) {
+      ASTNode whereExpr = parseInfo.getWhrForClause(dest);
+
+      if (whereExpr != null) {
+        OpParseContext inputCtx = opParseCtx.get(input);
+        RowResolver inputRR = inputCtx.getRowResolver();
+        ExprNodeDesc current = genExprNodeDesc((ASTNode)whereExpr.getChild(0), inputRR);
+
+        // Check the list of where expressions already added so they aren't duplicated
+        ExprNodeDesc.ExprNodeDescEqualityWrapper currentWrapped =
+            new ExprNodeDesc.ExprNodeDescEqualityWrapper(current);
+        if (!whereExpressions.contains(currentWrapped)) {
+          whereExpressions.add(currentWrapped);
+        } else {
+          continue;
+        }
+
+        if (previous == null) {
+          // If this is the first expression
+          previous = current;
+          continue;
+        }
+
+        GenericUDFOPOr or = new GenericUDFOPOr();
+        List<ExprNodeDesc> expressions = new ArrayList<ExprNodeDesc>(2);
+        expressions.add(previous);
+        expressions.add(current);
+        ExprNodeDesc orExpr =
+            new ExprNodeGenericFuncDesc(TypeInfoFactory.booleanTypeInfo, or, expressions);
+        previous = orExpr;
+      } else {
+        // If an expression does not have a where clause, there can be no common filter
+        previous = null;
+        break;
+      }
+    }
+
+    if (previous != null) {
+      OpParseContext inputCtx = opParseCtx.get(input);
+      RowResolver inputRR = inputCtx.getRowResolver();
+      FilterDesc orFilterDesc = new FilterDesc(previous, false);
+
+      selectInput = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        orFilterDesc, new RowSchema(
+        inputRR.getColumnInfos()), input), inputRR);
+    }
+
+    // insert a select operator here used by the ColumnPruner to reduce
+    // the data to shuffle
+    Operator select = insertSelectAllPlanForGroupBy(selectInput);
+
+    // Generate ReduceSinkOperator
+    Operator reduceSinkOperatorInfo = genCommonGroupByPlanReduceSinkOperator(qb, dests, select);
+
+    // It is assumed throughout the code that a reducer has a single child, add a
+    // ForwardOperator so that we can add multiple filter/group by operators as children
+    RowResolver reduceSinkOperatorInfoRR = opParseCtx.get(reduceSinkOperatorInfo).getRowResolver();
+    Operator forwardOp = putOpInsertMap(OperatorFactory.getAndMakeChild(new ForwardDesc(),
+        new RowSchema(reduceSinkOperatorInfoRR.getColumnInfos()), reduceSinkOperatorInfo),
+        reduceSinkOperatorInfoRR);
+
+    Operator curr = forwardOp;
+
+    for (String dest : dests) {
+      curr = forwardOp;
+
+      if (parseInfo.getWhrForClause(dest) != null) {
+        curr = genFilterPlan(dest, qb, forwardOp);
+      }
+
+      // Generate GroupbyOperator
+      Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
+          dest, curr, GroupByDesc.Mode.COMPLETE, null);
+
+      curr = genPostGroupByBodyPlan(groupByOperatorInfo, dest, qb);
+    }
+
+    return curr;
+  }
+
   static ArrayList<GenericUDAFEvaluator> getUDAFEvaluators(
       ArrayList<AggregationDesc> aggs) {
     ArrayList<GenericUDAFEvaluator> result = new ArrayList<GenericUDAFEvaluator>();
@@ -3290,40 +3517,6 @@ public class SemanticAnalyzer extends Ba
   }
 
   /**
-   * Generate a Multi Group-By plan using a single map-reduce job.
-   *
-   * @param dest
-   * @param qb
-   * @param input
-   * @return
-   * @throws SemanticException
-   *
-   *           Generate a Group-By plan using single map-reduce job, if there is
-   *           common group by key. Spray by the
-   *           common group by key set and compute
-   *           aggregates in the reduce. The agggregation evaluation
-   *           functions are as follows:
-   *
-   *           Partitioning Key: common group by key set
-   *
-   *           Sorting Key: group by keys, distinct keys
-   *
-   *           Reducer: iterate/terminate  (mode = COMPLETE)
-   *
-   */
-  private Operator<?> genGroupByPlan1MRMultiGroupBy(String dest, QB qb,
-      Operator<?> input) throws SemanticException {
-
-    QBParseInfo parseInfo = qb.getParseInfo();
-
-    // //////  Generate GroupbyOperator
-    Operator<?> groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
-        dest, input, GroupByDesc.Mode.COMPLETE, null);
-
-    return groupByOperatorInfo;
-  }
-
-  /**
    * Generate a Group-By plan using a 2 map-reduce jobs (5 operators will be
    * inserted):
    *
@@ -5489,7 +5682,7 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
-  private Operator insertSelectAllPlanForGroupBy(String dest, Operator input)
+  private Operator insertSelectAllPlanForGroupBy(Operator input)
       throws SemanticException {
     OpParseContext inputCtx = opParseCtx.get(input);
     RowResolver inputRR = inputCtx.getRowResolver();
@@ -5528,7 +5721,7 @@ public class SemanticAnalyzer extends Ba
       return null;
     }
 
-    List<ExprNodeDesc> oldList = null;
+    List<ExprNodeDesc.ExprNodeDescEqualityWrapper> oldList = null;
     List<ASTNode> oldASTList = null;
 
     for (String dest : ks) {
@@ -5548,31 +5741,27 @@ public class SemanticAnalyzer extends Ba
         return null;
       }
 
-      List<ExprNodeDesc> currDestList = new ArrayList<ExprNodeDesc>();
+      List<ExprNodeDesc.ExprNodeDescEqualityWrapper> currDestList;
+      try {
+        currDestList = getDistinctExprs(qbp, dest, inputRR);
+      } catch (SemanticException e) {
+        return null;
+      }
+
       List<ASTNode> currASTList = new ArrayList<ASTNode>();
       for (ASTNode value: list) {
-        try {
-          // 0 is function name
-          for (int i = 1; i < value.getChildCount(); i++) {
-            ASTNode parameter = (ASTNode) value.getChild(i);
-            currDestList.add(genExprNodeDesc(parameter, inputRR));
-            currASTList.add(parameter);
-          }
-        } catch (SemanticException e) {
-          return null;
+        // 0 is function name
+        for (int i = 1; i < value.getChildCount(); i++) {
+          ASTNode parameter = (ASTNode) value.getChild(i);
+          currASTList.add(parameter);
         }
         if (oldList == null) {
           oldList = currDestList;
           oldASTList = currASTList;
         } else {
-          if (oldList.size() != currDestList.size()) {
+          if (!matchExprLists(oldList, currDestList)) {
             return null;
           }
-          for (int pos = 0; pos < oldList.size(); pos++) {
-            if (!oldList.get(pos).isSame(currDestList.get(pos))) {
-              return null;
-            }
-          }
         }
       }
     }
@@ -5671,185 +5860,130 @@ public class SemanticAnalyzer extends Ba
     return rsOp;
   }
 
-  // see if there are any distinct expressions
-  private boolean distinctExprsExists(QB qb) {
+  // Groups the clause names into lists so that any two clauses in the same list has the same
+  // group by and distinct keys and no clause appears in more than one list.  Returns a list of the
+  // lists of clauses.
+  private List<List<String>> getCommonGroupByDestGroups(QB qb, Operator input)
+      throws SemanticException {
+
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     QBParseInfo qbp = qb.getParseInfo();
 
     TreeSet<String> ks = new TreeSet<String>();
     ks.addAll(qbp.getClauseNames());
 
-    for (String dest : ks) {
-      List<ASTNode> list = qbp.getDistinctFuncExprsForClause(dest);
-      if (!list.isEmpty()) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  // return the common group by key set.
-  // Null if there are no common group by keys.
-  private List<ASTNode> getCommonGroupbyKeys(QB qb, Operator input) {
-    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
-    QBParseInfo qbp = qb.getParseInfo();
+    List<List<String>> commonGroupByDestGroups = new ArrayList<List<String>>();
 
-    Set<String> ks = qbp.getClauseNames();
-    // Go over all the destination tables
+    // If this is a trivial query block return
     if (ks.size() <= 1) {
-      return null;
+      List<String> oneList =  new ArrayList<String>(1);
+      if (ks.size() == 1) {
+        oneList.add(ks.first());
+      }
+      commonGroupByDestGroups.add(oneList);
+      return commonGroupByDestGroups;
     }
 
-    List<ASTNode> oldList = null;
+    List<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>> sprayKeyLists =
+        new ArrayList<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>>(ks.size());
 
+    // Iterate over each clause
     for (String dest : ks) {
-      // If a filter is present, common processing is not possible
-      if (qbp.getWhrForClause(dest) != null) {
-        return null;
-      }
 
-      //  if one of the sub-queries does not involve an aggregation, common
-      // processing is not possible
-      List<ASTNode> list = getGroupByForClause(qbp, dest);
-      if (list.isEmpty()) {
-        return null;
+      List<ExprNodeDesc.ExprNodeDescEqualityWrapper> sprayKeys =
+          getDistinctExprs(qbp, dest, inputRR);
+
+      // Add the group by expressions
+      List<ASTNode> grpByExprs = getGroupByForClause(qbp, dest);
+      for (ASTNode grpByExpr: grpByExprs) {
+        ExprNodeDesc.ExprNodeDescEqualityWrapper grpByExprWrapper =
+            new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(grpByExpr, inputRR));
+        if (!sprayKeys.contains(grpByExprWrapper)) {
+          sprayKeys.add(grpByExprWrapper);
+        }
       }
-      if (oldList == null) {
-        oldList = new ArrayList<ASTNode>();
-        oldList.addAll(list);
-      } else {
-        int pos = 0;
-        for (pos = 0; pos < oldList.size(); pos++) {
-          if (pos < list.size()) {
-            if (!oldList.get(pos).toStringTree().equals(list.get(pos).toStringTree())) {
-              break;
-            }
-          } else {
-            break;
-          }
+
+      // Loop through each of the lists of exprs, looking for a match
+      boolean found = false;
+      for (int i = 0; i < sprayKeyLists.size(); i++) {
+
+        if (!matchExprLists(sprayKeyLists.get(i), sprayKeys)) {
+          continue;
         }
-        oldList = oldList.subList(0, pos);
+
+        // A match was found, so add the clause to the corresponding list
+        commonGroupByDestGroups.get(i).add(dest);
+        found = true;
+        break;
       }
-      if (oldList.isEmpty()) {
-        return null;
+
+      // No match was found, so create new entries
+      if (!found) {
+        sprayKeyLists.add(sprayKeys);
+        List<String> destGroup = new ArrayList<String>();
+        destGroup.add(dest);
+        commonGroupByDestGroups.add(destGroup);
       }
     }
-    return oldList;
-  }
 
-  /**
-   * Generates reduce sink for multigroupby query for non null common groupby set
-   *
-   *All groupby keys and distinct exprs are added to reduce keys. And rows are
-   *partitioned on common groupby key set.
-   *
-   * @param qb
-   * @param input
-   * @return
-   * @throws SemanticException
-   */
-  private Operator createCommonReduceSink1(QB qb, Operator input)
-  throws SemanticException {
-    // Go over all the tables and get common groupby key
-    List<ASTNode> cmonGbyExprs = getCommonGroupbyKeys(qb, input);
+    return commonGroupByDestGroups;
+  }
 
-    QBParseInfo qbp = qb.getParseInfo();
-    TreeSet<String> ks = new TreeSet<String>();
-    ks.addAll(qbp.getClauseNames());
+  // Returns whether or not two lists contain the same elements independent of order
+  private boolean matchExprLists(List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list1,
+      List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list2) {
 
-    // Pass the entire row
-    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
-    RowResolver reduceSinkOutputRowResolver = new RowResolver();
-    reduceSinkOutputRowResolver.setIsExprResolver(true);
-    ArrayList<ExprNodeDesc> reduceKeys = new ArrayList<ExprNodeDesc>();
-    ArrayList<ExprNodeDesc> reducePartKeys = new ArrayList<ExprNodeDesc>();
-    ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
-    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-    List<String> outputColumnNames = new ArrayList<String>();
-    for (String dest : ks) {
-      List<ASTNode> grpByExprs = getGroupByForClause(qbp, dest);
-      for (int i = 0; i < grpByExprs.size(); ++i) {
-        ASTNode grpbyExpr = grpByExprs.get(i);
+    if (list1.size() != list2.size()) {
+      return false;
+    }
 
-        if (reduceSinkOutputRowResolver.getExpression(grpbyExpr) == null) {
-          ExprNodeDesc grpByExprNode = genExprNodeDesc(grpbyExpr, inputRR);
-          reduceKeys.add(grpByExprNode);
-          String field = Utilities.ReduceField.KEY.toString() + "."
-          + getColumnInternalName(reduceKeys.size() - 1);
-          ColumnInfo colInfo = new ColumnInfo(field, reduceKeys.get(
-              reduceKeys.size() - 1).getTypeInfo(), "", false);
-          reduceSinkOutputRowResolver.putExpression(grpbyExpr, colInfo);
-          outputColumnNames.add(getColumnInternalName(reduceKeys.size() - 1));
-          colExprMap.put(colInfo.getInternalName(), grpByExprNode);
-        }
+    for (ExprNodeDesc.ExprNodeDescEqualityWrapper exprNodeDesc : list1) {
+      if (!list2.contains(exprNodeDesc)) {
+        return false;
       }
     }
-    // Add distinct group-by exprs to reduceKeys
-    List<ASTNode> distExprs = getCommonDistinctExprs(qb, input);
-    if (distExprs != null) {
-      for (ASTNode distn : distExprs) {
-        if (reduceSinkOutputRowResolver.getExpression(distn) == null) {
-          ExprNodeDesc distExpr = genExprNodeDesc(distn, inputRR);
-          reduceKeys.add(distExpr);
-          String field = Utilities.ReduceField.KEY.toString() + "."
-              + getColumnInternalName(reduceKeys.size() - 1);
-          ColumnInfo colInfo = new ColumnInfo(field, reduceKeys.get(
-              reduceKeys.size() - 1).getTypeInfo(), "", false);
-          reduceSinkOutputRowResolver.putExpression(distn, colInfo);
-          outputColumnNames.add(getColumnInternalName(reduceKeys.size() - 1));
-          colExprMap.put(colInfo.getInternalName(), distExpr);
+
+    return true;
+  }
+
+  // Returns a list of the distinct exprs for a given clause name as
+  // ExprNodeDesc.ExprNodeDescEqualityWrapper without duplicates
+  private List<ExprNodeDesc.ExprNodeDescEqualityWrapper>
+      getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR) throws SemanticException {
+
+    List<ASTNode> distinctAggExprs = qbp.getDistinctFuncExprsForClause(dest);
+    List<ExprNodeDesc.ExprNodeDescEqualityWrapper> distinctExprs =
+        new ArrayList<ExprNodeDesc.ExprNodeDescEqualityWrapper>();
+
+    for (ASTNode distinctAggExpr: distinctAggExprs) {
+      // 0 is function name
+      for (int i = 1; i < distinctAggExpr.getChildCount(); i++) {
+        ASTNode parameter = (ASTNode) distinctAggExpr.getChild(i);
+        ExprNodeDesc.ExprNodeDescEqualityWrapper distinctExpr =
+            new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(parameter, inputRR));
+        if (!distinctExprs.contains(distinctExpr)) {
+          distinctExprs.add(distinctExpr);
         }
       }
     }
-    // Add common groupby keys to partition keys
-    for (ASTNode gby : cmonGbyExprs) {
-      ExprNodeDesc distExpr = genExprNodeDesc(gby, inputRR);
-      reducePartKeys.add(distExpr);
-    }
-
-    // Go over all the aggregations
-    for (String dest : ks) {
 
-      // For each aggregation
-      HashMap<String, ASTNode> aggregationTrees = qbp
-      .getAggregationExprsForClause(dest);
-      assert (aggregationTrees != null);
+    return distinctExprs;
+  }
 
-      for (Map.Entry<String, ASTNode> entry : aggregationTrees.entrySet()) {
-        ASTNode value = entry.getValue();
-        value.getChild(0).getText();
+  // see if there are any distinct expressions
+  private boolean distinctExprsExists(QB qb) {
+    QBParseInfo qbp = qb.getParseInfo();
 
-        // 0 is the function name
-        for (int i = 1; i < value.getChildCount(); i++) {
-          ASTNode paraExpr = (ASTNode) value.getChild(i);
+    TreeSet<String> ks = new TreeSet<String>();
+    ks.addAll(qbp.getClauseNames());
 
-          if (reduceSinkOutputRowResolver.getExpression(paraExpr) == null) {
-            ExprNodeDesc paraExprNode = genExprNodeDesc(paraExpr, inputRR);
-            reduceValues.add(paraExprNode);
-            String field = Utilities.ReduceField.VALUE.toString() + "."
-                + getColumnInternalName(reduceValues.size() - 1);
-            ColumnInfo colInfo = new ColumnInfo(field, reduceValues.get(
-                reduceValues.size() - 1).getTypeInfo(), "", false);
-            reduceSinkOutputRowResolver.putExpression(paraExpr, colInfo);
-            outputColumnNames
-                .add(getColumnInternalName(reduceValues.size() - 1));
-          }
-        }
+    for (String dest : ks) {
+      List<ASTNode> list = qbp.getDistinctFuncExprsForClause(dest);
+      if (!list.isEmpty()) {
+        return true;
       }
     }
-    StringBuilder order = new StringBuilder();
-    for (int i = 0; i < reduceKeys.size(); i++) {
-      order.append("+");
-    }
-
-    ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
-        OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(
-            reduceKeys, reduceValues,
-            outputColumnNames, true, -1,
-            reducePartKeys, order.toString(), -1),
-            new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()), input),
-            reduceSinkOutputRowResolver);
-    rsOp.setColumnExprMap(colExprMap);
-    return rsOp;
+    return false;
   }
 
   @SuppressWarnings("nls")
@@ -5861,48 +5995,16 @@ public class SemanticAnalyzer extends Ba
     // currently. It doesnt matter whether he has asked to do
     // map-side aggregation or not. Map side aggregation is turned off
     List<ASTNode> commonDistinctExprs = getCommonDistinctExprs(qb, input);
-    List<ASTNode> commonGbyKeys = getCommonGroupbyKeys(qb, input);
-    LOG.warn("Common Gby keys:" + commonGbyKeys);
     boolean optimizeMultiGroupBy = commonDistinctExprs != null;
-    // Generate single MR job for multigroupby query if query has non-null common
-    // groupby key set and there are zero or one common distinct expression.
-    boolean singlemrMultiGroupBy =
-      conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEMR)
-      && commonGbyKeys != null && !commonGbyKeys.isEmpty() &&
-      (!distinctExprsExists(qb) || commonDistinctExprs != null);
 
     Operator curr = input;
 
-    // If there are multiple group-bys, map-side aggregation is turned off,
-    // and there are no filters.
-    // if there is a common groupby key set, spray by the common groupby key set
-    // and generate single mr job
-    if (singlemrMultiGroupBy) {
-      curr = createCommonReduceSink1(qb, input);
-
-      RowResolver currRR = opParseCtx.get(curr).getRowResolver();
-      // create a forward operator
-      input = putOpInsertMap(OperatorFactory.getAndMakeChild(new ForwardDesc(),
-          new RowSchema(currRR.getColumnInfos()), curr), currRR);
-
-      for (String dest : ks) {
-        curr = input;
-        curr = genGroupByPlan1MRMultiGroupBy(dest, qb, curr);
-        curr = genSelectPlan(dest, qb, curr);
-        Integer limit = qbp.getDestLimit(dest);
-        if (limit != null) {
-          curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(), true);
-          qb.getParseInfo().setOuterQueryLimit(limit.intValue());
-        }
-        curr = genFileSinkPlan(dest, qb, curr);
-      }
-    }
-    // and if there is a single distinct, optimize that. Spray initially by the
+    // if there is a single distinct, optimize that. Spray initially by the
     // distinct key,
     // no computation at the mapper. Have multiple group by operators at the
     // reducer - and then
     // proceed
-    else if (optimizeMultiGroupBy) {
+    if (optimizeMultiGroupBy) {
       curr = createCommonReduceSink(qb, input);
 
       RowResolver currRR = opParseCtx.get(curr).getRowResolver();
@@ -5922,107 +6024,160 @@ public class SemanticAnalyzer extends Ba
         curr = genFileSinkPlan(dest, qb, curr);
       }
     } else {
-      // Go over all the destination tables
-      for (String dest : ks) {
-        curr = input;
+      List<List<String>> commonGroupByDestGroups = null;
 
-        if (qbp.getWhrForClause(dest) != null) {
-          curr = genFilterPlan(dest, qb, curr);
+      // If we can put multiple group bys in a single reducer, determine suitable groups of
+      // expressions, otherwise treat all the expressions as a single group
+      if (conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
+        try {
+          commonGroupByDestGroups = getCommonGroupByDestGroups(qb, curr);
+        } catch (SemanticException e) {
+          LOG.error("Failed to group clauses by common spray keys.", e);
         }
+      }
 
-        if (qbp.getAggregationExprsForClause(dest).size() != 0
-            || getGroupByForClause(qbp, dest).size() > 0) {
-          //multiple distincts is not supported with skew in data
-          if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
-             qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
-            throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
-                getMsg());
-          }
-          // insert a select operator here used by the ColumnPruner to reduce
-          // the data to shuffle
-          curr = insertSelectAllPlanForGroupBy(dest, curr);
-          if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
-            if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
-              curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
-            } else {
-              curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
+      if (commonGroupByDestGroups == null) {
+        commonGroupByDestGroups = new ArrayList<List<String>>();
+        commonGroupByDestGroups.add(new ArrayList<String>(ks));
+      }
+
+      if (!commonGroupByDestGroups.isEmpty()) {
+
+        // Iterate over each group of subqueries with the same group by/distinct keys
+        for (List<String> commonGroupByDestGroup : commonGroupByDestGroups) {
+          if (commonGroupByDestGroup.isEmpty()) {
+            continue;
+          }
+
+          String firstDest = commonGroupByDestGroup.get(0);
+          // Constructs a standard group by plan if:
+          // There is no other subquery with the same group by/distinct keys or
+          // (There are no aggregations in a representative query for the group and
+          //  There is no group by in that representative query) or
+          // The data is skewed or
+          // The conf variable used to control combining group bys into a signle reducer is false
+          if (commonGroupByDestGroup.size() == 1 ||
+              (qbp.getAggregationExprsForClause(firstDest).size() == 0 &&
+              getGroupByForClause(qbp, firstDest).size() == 0) ||
+              conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) ||
+              !conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
+
+            // Go over all the destination tables
+            for (String dest : commonGroupByDestGroup) {
+              curr = input;
+
+              if (qbp.getWhrForClause(dest) != null) {
+                curr = genFilterPlan(dest, qb, curr);
+              }
+
+              if (qbp.getAggregationExprsForClause(dest).size() != 0
+                  || getGroupByForClause(qbp, dest).size() > 0) {
+                //multiple distincts is not supported with skew in data
+                if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
+                   qbp.getDistinctFuncExprsForClause(dest).size() > 1) {
+                  throw new SemanticException(ErrorMsg.UNSUPPORTED_MULTIPLE_DISTINCTS.
+                      getMsg());
+                }
+                // insert a select operator here used by the ColumnPruner to reduce
+                // the data to shuffle
+                curr = insertSelectAllPlanForGroupBy(curr);
+                if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
+                  if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+                    curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
+                  } else {
+                    curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
+                  }
+                } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+                  curr = genGroupByPlan2MR(dest, qb, curr);
+                } else {
+                  curr = genGroupByPlan1MR(dest, qb, curr);
+                }
+              }
+
+              curr = genPostGroupByBodyPlan(curr, dest, qb);
             }
-          } else if (conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
-            curr = genGroupByPlan2MR(dest, qb, curr);
           } else {
-            curr = genGroupByPlan1MR(dest, qb, curr);
+            curr = genGroupByPlan1MRMultiReduceGB(commonGroupByDestGroup, qb, input);
           }
         }
+      }
+    }
 
-        // Insert HAVING plan here
-        if (qbp.getHavingForClause(dest) != null) {
-          if (getGroupByForClause(qbp, dest).size() == 0) {
-            throw new SemanticException("HAVING specified without GROUP BY");
-          }
-          curr = genHavingPlan(dest, qb, curr);
-        }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created Body Plan for Query Block " + qb.getId());
+    }
 
-        curr = genSelectPlan(dest, qb, curr);
-        Integer limit = qbp.getDestLimit(dest);
+    return curr;
+  }
 
-        if (qbp.getClusterByForClause(dest) != null
-            || qbp.getDistributeByForClause(dest) != null
-            || qbp.getOrderByForClause(dest) != null
-            || qbp.getSortByForClause(dest) != null) {
+  private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb)
+      throws SemanticException {
 
-          int numReducers = -1;
+    QBParseInfo qbp = qb.getParseInfo();
 
-          // Use only 1 reducer if order by is present
-          if (qbp.getOrderByForClause(dest) != null) {
-            numReducers = 1;
-          }
+    // Insert HAVING plan here
+    if (qbp.getHavingForClause(dest) != null) {
+      if (getGroupByForClause(qbp, dest).size() == 0) {
+        throw new SemanticException("HAVING specified without GROUP BY");
+      }
+      curr = genHavingPlan(dest, qb, curr);
+    }
 
-          curr = genReduceSinkPlan(dest, qb, curr, numReducers);
-        }
+    curr = genSelectPlan(dest, qb, curr);
+    Integer limit = qbp.getDestLimit(dest);
 
-        if (qbp.getIsSubQ()) {
-          if (limit != null) {
-            // In case of order by, only 1 reducer is used, so no need of
-            // another shuffle
-            curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(), qbp
-                .getOrderByForClause(dest) != null ? false : true);
-          }
-        } else {
-          curr = genConversionOps(dest, qb, curr);
-          // exact limit can be taken care of by the fetch operator
-          if (limit != null) {
-            boolean extraMRStep = true;
-
-            if (qb.getIsQuery() && qbp.getClusterByForClause(dest) == null
-                && qbp.getSortByForClause(dest) == null) {
-              extraMRStep = false;
-            }
+    if (qbp.getClusterByForClause(dest) != null
+        || qbp.getDistributeByForClause(dest) != null
+        || qbp.getOrderByForClause(dest) != null
+        || qbp.getSortByForClause(dest) != null) {
 
-            curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(),
-                extraMRStep);
-            qb.getParseInfo().setOuterQueryLimit(limit.intValue());
-          }
-          curr = genFileSinkPlan(dest, qb, curr);
-        }
-
-        // change curr ops row resolver's tab aliases to query alias if it
-        // exists
-        if (qb.getParseInfo().getAlias() != null) {
-          RowResolver rr = opParseCtx.get(curr).getRowResolver();
-          RowResolver newRR = new RowResolver();
-          String alias = qb.getParseInfo().getAlias();
-          for (ColumnInfo colInfo : rr.getColumnInfos()) {
-            String name = colInfo.getInternalName();
-            String[] tmp = rr.reverseLookup(name);
-            newRR.put(alias, tmp[1], colInfo);
-          }
-          opParseCtx.get(curr).setRowResolver(newRR);
-        }
+      int numReducers = -1;
+
+      // Use only 1 reducer if order by is present
+      if (qbp.getOrderByForClause(dest) != null) {
+        numReducers = 1;
       }
+
+      curr = genReduceSinkPlan(dest, qb, curr, numReducers);
     }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Created Body Plan for Query Block " + qb.getId());
+    if (qbp.getIsSubQ()) {
+      if (limit != null) {
+        // In case of order by, only 1 reducer is used, so no need of
+        // another shuffle
+        curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(), qbp
+            .getOrderByForClause(dest) != null ? false : true);
+      }
+    } else {
+      curr = genConversionOps(dest, qb, curr);
+      // exact limit can be taken care of by the fetch operator
+      if (limit != null) {
+        boolean extraMRStep = true;
+
+        if (qb.getIsQuery() && qbp.getClusterByForClause(dest) == null
+            && qbp.getSortByForClause(dest) == null) {
+          extraMRStep = false;
+        }
+
+        curr = genLimitMapRedPlan(dest, qb, curr, limit.intValue(),
+            extraMRStep);
+        qb.getParseInfo().setOuterQueryLimit(limit.intValue());
+      }
+      curr = genFileSinkPlan(dest, qb, curr);
+    }
+
+    // change curr ops row resolver's tab aliases to query alias if it
+    // exists
+    if (qb.getParseInfo().getAlias() != null) {
+      RowResolver rr = opParseCtx.get(curr).getRowResolver();
+      RowResolver newRR = new RowResolver();
+      String alias = qb.getParseInfo().getAlias();
+      for (ColumnInfo colInfo : rr.getColumnInfos()) {
+        String name = colInfo.getInternalName();
+        String[] tmp = rr.reverseLookup(name);
+        newRR.put(alias, tmp[1], colInfo);
+      }
+      opParseCtx.get(curr).setRowResolver(newRR);
     }
 
     return curr;
@@ -6466,40 +6621,40 @@ public class SemanticAnalyzer extends Ba
       tsDesc.addVirtualCols(vcList);
 
       String tblName = tab.getTableName();
-    	tableSpec tblSpec = qbp.getTableSpec(alias);
-    	Map<String, String> partSpec = tblSpec.getPartSpec();
+      tableSpec tblSpec = qbp.getTableSpec(alias);
+      Map<String, String> partSpec = tblSpec.getPartSpec();
 
-    	if (partSpec != null) {
-    	  List<String> cols = new ArrayList<String>();
-    	  cols.addAll(partSpec.keySet());
-    	  tsDesc.setPartColumns(cols);
-    	}
-
-    	// Theoretically the key prefix could be any unique string shared
-    	// between TableScanOperator (when publishing) and StatsTask (when aggregating).
-    	// Here we use
-    	//       table_name + partitionSec
-    	// as the prefix for easy of read during explain and debugging.
-    	// Currently, partition spec can only be static partition.
-    	String k = tblName + Path.SEPARATOR;
-    	tsDesc.setStatsAggPrefix(k);
-
-    	// set up WritenEntity for replication
-    	outputs.add(new WriteEntity(tab, true));
-
-    	// add WriteEntity for each matching partition
-    	if (tab.isPartitioned()) {
-    	  if (partSpec == null) {
-    	    throw new SemanticException(ErrorMsg.NEED_PARTITION_SPECIFICATION.getMsg());
-    	  }
-    	  List<Partition> partitions = qbp.getTableSpec().partitions;
-    	  if (partitions != null) {
-    	    for (Partition partn : partitions) {
-    	      // inputs.add(new ReadEntity(partn)); // is this needed at all?
-    	      outputs.add(new WriteEntity(partn, true));
+      if (partSpec != null) {
+        List<String> cols = new ArrayList<String>();
+        cols.addAll(partSpec.keySet());
+        tsDesc.setPartColumns(cols);
+      }
+
+      // Theoretically the key prefix could be any unique string shared
+      // between TableScanOperator (when publishing) and StatsTask (when aggregating).
+      // Here we use
+      //       table_name + partitionSec
+      // as the prefix for easy of read during explain and debugging.
+      // Currently, partition spec can only be static partition.
+      String k = tblName + Path.SEPARATOR;
+      tsDesc.setStatsAggPrefix(k);
+
+      // set up WritenEntity for replication
+      outputs.add(new WriteEntity(tab, true));
+
+      // add WriteEntity for each matching partition
+      if (tab.isPartitioned()) {
+        if (partSpec == null) {
+          throw new SemanticException(ErrorMsg.NEED_PARTITION_SPECIFICATION.getMsg());
+        }
+        List<Partition> partitions = qbp.getTableSpec().partitions;
+        if (partitions != null) {
+          for (Partition partn : partitions) {
+            // inputs.add(new ReadEntity(partn)); // is this needed at all?
+            outputs.add(new WriteEntity(partn, true));
           }
         }
-    	}
+      }
     }
   }
 
@@ -8206,7 +8361,7 @@ public class SemanticAnalyzer extends Ba
           break;
         }else{
           mrtask.setLocalMode(true);
-	}
+  }
       } catch (IOException e) {
         throw new SemanticException (e);
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java Tue Jan  3 18:10:34 2012
@@ -22,9 +22,9 @@ import java.io.Serializable;
 import java.util.List;
 
 import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 /**
  * ExprNodeDesc.
@@ -90,4 +90,31 @@ public abstract class ExprNodeDesc imple
     return this.getClass().getName();
   }
 
+  // This wraps an instance of an ExprNodeDesc, and makes equals work like isSame, see comment on
+  // isSame
+  public static class ExprNodeDescEqualityWrapper {
+    private ExprNodeDesc exprNodeDesc;
+
+    public ExprNodeDescEqualityWrapper(ExprNodeDesc exprNodeDesc) {
+      this.exprNodeDesc = exprNodeDesc;
+    }
+
+    public ExprNodeDesc getExprNodeDesc() {
+      return exprNodeDesc;
+    }
+
+    public void setExprNodeDesc(ExprNodeDesc exprNodeDesc) {
+      this.exprNodeDesc = exprNodeDesc;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+
+      if (other == null || !(other instanceof ExprNodeDescEqualityWrapper)) {
+        return false;
+      }
+
+      return this.exprNodeDesc.isSame(((ExprNodeDescEqualityWrapper)other).getExprNodeDesc());
+    }
+  }
 }

Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby10.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby10.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby10.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby10.q Tue Jan  3 18:10:34 2012
@@ -1,4 +1,5 @@
 set hive.map.aggr=false;
+set hive.multigroupby.singlereducer=false;
 set hive.groupby.skewindata=true;
 
 CREATE TABLE dest1(key INT, val1 INT, val2 INT);
@@ -19,7 +20,7 @@ INSERT OVERWRITE TABLE dest2 SELECT INPU
 SELECT * from dest1;
 SELECT * from dest2;
 
-set hive.multigroupby.singlemr=true;
+set hive.multigroupby.singlereducer=true;
 
 EXPLAIN
 FROM INPUT 

Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby7_map.q Tue Jan  3 18:10:34 2012
@@ -1,4 +1,5 @@
 set hive.map.aggr=true;
+set hive.multigroupby.singlereducer=false;
 set hive.groupby.skewindata=false;
 set mapred.reduce.tasks=31;
 

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q?rev=1226903&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby7_map_multi_single_reducer.q Tue Jan  3 18:10:34 2012
@@ -0,0 +1,21 @@
+set hive.map.aggr=true;
+set hive.groupby.skewindata=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE DEST1(key INT, value STRING) STORED AS TEXTFILE;
+CREATE TABLE DEST2(key INT, value STRING) STORED AS TEXTFILE;
+
+SET hive.exec.compress.intermediate=true;
+SET hive.exec.compress.output=true; 
+
+EXPLAIN
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key
+INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key;
+
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key
+INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key;
+
+SELECT DEST1.* FROM DEST1;
+SELECT DEST2.* FROM DEST2;

Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew.q Tue Jan  3 18:10:34 2012
@@ -1,5 +1,5 @@
 set hive.map.aggr=false;
-
+set hive.multigroupby.singlereducer=false;
 set hive.groupby.skewindata=false;
 set mapred.reduce.tasks=31;
 

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q?rev=1226903&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby7_noskew_multi_single_reducer.q Tue Jan  3 18:10:34 2012
@@ -0,0 +1,21 @@
+set hive.map.aggr=false;
+set hive.groupby.skewindata=false;
+set mapred.reduce.tasks=31;
+
+CREATE TABLE DEST1(key INT, value STRING) STORED AS TEXTFILE;
+CREATE TABLE DEST2(key INT, value STRING) STORED AS TEXTFILE;
+
+SET hive.exec.compress.intermediate=true;
+SET hive.exec.compress.output=true; 
+
+EXPLAIN
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key limit 10
+INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key limit 10;
+
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key limit 10
+INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key limit 10;
+
+SELECT DEST1.* FROM DEST1;
+SELECT DEST2.* FROM DEST2;

Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby8.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby8.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby8.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby8.q Tue Jan  3 18:10:34 2012
@@ -16,7 +16,7 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
 SELECT DEST1.* FROM DEST1;
 SELECT DEST2.* FROM DEST2;
 
-set hive.multigroupby.singlemr=true;
+set hive.multigroupby.singlereducer=false;
 
 EXPLAIN
 FROM SRC

Modified: hive/trunk/ql/src/test/queries/clientpositive/groupby9.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby9.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby9.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby9.q Tue Jan  3 18:10:34 2012
@@ -26,7 +26,7 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
 SELECT DEST1.* FROM DEST1;
 SELECT DEST2.* FROM DEST2;
 
-set hive.multigroupby.singlemr=true;
+set hive.multigroupby.singlereducer=false;
 
 EXPLAIN
 FROM SRC

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q?rev=1226903&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_complex_types_multi_single_reducer.q Tue Jan  3 18:10:34 2012
@@ -0,0 +1,17 @@
+set hive.multigroupby.singlereducer=true;
+
+CREATE TABLE DEST1(key ARRAY<STRING>, value BIGINT) STORED AS TEXTFILE;
+CREATE TABLE DEST2(key MAP<STRING, STRING>, value BIGINT) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT ARRAY(SRC.key), COUNT(1) GROUP BY ARRAY(SRC.key) limit 10
+INSERT OVERWRITE TABLE DEST2 SELECT MAP(SRC.key, SRC.value), COUNT(1) GROUP BY MAP(SRC.key, SRC.value) limit 10;
+
+FROM SRC
+INSERT OVERWRITE TABLE DEST1 SELECT ARRAY(SRC.key), COUNT(1) GROUP BY ARRAY(SRC.key) limit 10
+INSERT OVERWRITE TABLE DEST2 SELECT MAP(SRC.key, SRC.value), COUNT(1) GROUP BY MAP(SRC.key, SRC.value) limit 10;
+
+SELECT DEST1.* FROM DEST1;
+SELECT DEST2.* FROM DEST2;
+

Added: hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q?rev=1226903&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/groupby_multi_single_reducer.q Tue Jan  3 18:10:34 2012
@@ -0,0 +1,49 @@
+set hive.multigroupby.singlereducer=true;
+
+CREATE TABLE dest_g2(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+CREATE TABLE dest_g3(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+CREATE TABLE dest_g4(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+CREATE TABLE dest_h2(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+CREATE TABLE dest_h3(key STRING, c1 INT, c2 STRING, c3 INT, c4 INT) STORED AS TEXTFILE;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) < 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g4 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) < 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g4 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1);
+
+SELECT * FROM dest_g2;
+SELECT * FROM dest_g3;
+SELECT * FROM dest_g4;
+
+EXPLAIN
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) < 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g4 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_h2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1), substr(src.key,2,1) LIMIT 10
+INSERT OVERWRITE TABLE dest_h3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1), substr(src.key,2,1);
+
+FROM src
+INSERT OVERWRITE TABLE dest_g2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) < 5 GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_g4 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(DISTINCT substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1)
+INSERT OVERWRITE TABLE dest_h2 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(substr(src.value, 5)), count(src.value) GROUP BY substr(src.key,1,1), substr(src.key,2,1) LIMIT 10
+INSERT OVERWRITE TABLE dest_h3 SELECT substr(src.key,1,1), count(DISTINCT substr(src.value,5)), concat(substr(src.key,1,1),sum(substr(src.value,5))), sum(substr(src.value, 5)), count(src.value) WHERE substr(src.key,1,1) >= 5 GROUP BY substr(src.key,1,1), substr(src.key,2,1);
+
+SELECT * FROM dest_g2;
+SELECT * FROM dest_g3;
+SELECT * FROM dest_g4;
+SELECT * FROM dest_h2;
+SELECT * FROM dest_h3;
+
+DROP TABLE dest_g2;
+DROP TABLE dest_g3;
+DROP TABLE dest_g4;
+DROP TABLE dest_h2;
+DROP TABLE dest_h3;

Modified: hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/multigroupby_singlemr.q Tue Jan  3 18:10:34 2012
@@ -1,5 +1,3 @@
-set hive.multigroupby.singlemr=true;
-
 CREATE TABLE TBL(C1 INT, C2 INT, C3 INT, C4 INT);
 
 CREATE TABLE DEST1(d1 INT, d2 INT) STORED AS TEXTFILE;

Modified: hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out?rev=1226903&r1=1226902&r2=1226903&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/groupby10.q.out Tue Jan  3 18:10:34 2012
@@ -343,10 +343,12 @@ ABSTRACT SYNTAX TREE:
 
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-0 depends on stages: Stage-2
-  Stage-3 depends on stages: Stage-0
-  Stage-1 depends on stages: Stage-2
-  Stage-4 depends on stages: Stage-1
+  Stage-3 depends on stages: Stage-2
+  Stage-0 depends on stages: Stage-3
+  Stage-4 depends on stages: Stage-0
+  Stage-5 depends on stages: Stage-2
+  Stage-1 depends on stages: Stage-5
+  Stage-6 depends on stages: Stage-1
 
 STAGE PLANS:
   Stage: Stage-2
@@ -357,89 +359,106 @@ STAGE PLANS:
             alias: input
             Reduce Output Operator
               key expressions:
-                    expr: key
-                    type: int
                     expr: substr(value, 5)
                     type: string
-              sort order: ++
+              sort order: +
               Map-reduce partition columns:
+                    expr: substr(value, 5)
+                    type: string
+              tag: -1
+              value expressions:
                     expr: key
                     type: int
-              tag: -1
       Reduce Operator Tree:
         Forward
           Group By Operator
             aggregations:
-                  expr: count(KEY._col1)
-                  expr: count(DISTINCT KEY._col1)
+                  expr: count(KEY._col0)
+                  expr: count(DISTINCT KEY._col0)
             bucketGroup: false
             keys:
-                  expr: KEY._col0
+                  expr: VALUE._col0
                   type: int
-            mode: complete
+            mode: hash
             outputColumnNames: _col0, _col1, _col2
-            Select Operator
-              expressions:
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+          Group By Operator
+            aggregations:
+                  expr: sum(KEY._col0)
+                  expr: sum(DISTINCT KEY._col0)
+            bucketGroup: false
+            keys:
+                  expr: VALUE._col0
+                  type: int
+            mode: hash
+            outputColumnNames: _col0, _col1, _col2
+            File Output Operator
+              compressed: false
+              GlobalTableId: 0
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+  Stage: Stage-3
+    Map Reduce
+      Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+            Reduce Output Operator
+              key expressions:
                     expr: _col0
                     type: int
+              sort order: +
+              Map-reduce partition columns:
+                    expr: _col0
+                    type: int
+              tag: -1
+              value expressions:
                     expr: _col1
                     type: bigint
                     expr: _col2
                     type: bigint
-              outputColumnNames: _col0, _col1, _col2
-              Select Operator
-                expressions:
-                      expr: _col0
-                      type: int
-                      expr: UDFToInteger(_col1)
-                      type: int
-                      expr: UDFToInteger(_col2)
-                      type: int
-                outputColumnNames: _col0, _col1, _col2
-                File Output Operator
-                  compressed: false
-                  GlobalTableId: 1
-                  table:
-                      input format: org.apache.hadoop.mapred.TextInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                      name: default.dest1
-          Group By Operator
-            aggregations:
-                  expr: sum(KEY._col1)
-                  expr: sum(DISTINCT KEY._col1)
-            bucketGroup: false
-            keys:
-                  expr: KEY._col0
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(VALUE._col0)
+                expr: count(VALUE._col1)
+          bucketGroup: false
+          keys:
+                expr: KEY._col0
+                type: int
+          mode: final
+          outputColumnNames: _col0, _col1, _col2
+          Select Operator
+            expressions:
+                  expr: _col0
                   type: int
-            mode: complete
+                  expr: _col1
+                  type: bigint
+                  expr: _col2
+                  type: bigint
             outputColumnNames: _col0, _col1, _col2
             Select Operator
               expressions:
                     expr: _col0
                     type: int
-                    expr: _col1
-                    type: double
-                    expr: _col2
-                    type: double
+                    expr: UDFToInteger(_col1)
+                    type: int
+                    expr: UDFToInteger(_col2)
+                    type: int
               outputColumnNames: _col0, _col1, _col2
-              Select Operator
-                expressions:
-                      expr: _col0
-                      type: int
-                      expr: UDFToInteger(_col1)
-                      type: int
-                      expr: UDFToInteger(_col2)
-                      type: int
-                outputColumnNames: _col0, _col1, _col2
-                File Output Operator
-                  compressed: false
-                  GlobalTableId: 2
-                  table:
-                      input format: org.apache.hadoop.mapred.TextInputFormat
-                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                      name: default.dest2
+              File Output Operator
+                compressed: false
+                GlobalTableId: 1
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    name: default.dest1
 
   Stage: Stage-0
     Move Operator
@@ -451,9 +470,65 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest1
 
-  Stage: Stage-3
+  Stage: Stage-4
     Stats-Aggr Operator
 
+  Stage: Stage-5
+    Map Reduce
+      Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+            Reduce Output Operator
+              key expressions:
+                    expr: _col0
+                    type: int
+              sort order: +
+              Map-reduce partition columns:
+                    expr: _col0
+                    type: int
+              tag: -1
+              value expressions:
+                    expr: _col1
+                    type: double
+                    expr: _col2
+                    type: double
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: sum(VALUE._col0)
+                expr: sum(VALUE._col1)
+          bucketGroup: false
+          keys:
+                expr: KEY._col0
+                type: int
+          mode: final
+          outputColumnNames: _col0, _col1, _col2
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: int
+                  expr: _col1
+                  type: double
+                  expr: _col2
+                  type: double
+            outputColumnNames: _col0, _col1, _col2
+            Select Operator
+              expressions:
+                    expr: _col0
+                    type: int
+                    expr: UDFToInteger(_col1)
+                    type: int
+                    expr: UDFToInteger(_col2)
+                    type: int
+              outputColumnNames: _col0, _col1, _col2
+              File Output Operator
+                compressed: false
+                GlobalTableId: 2
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    name: default.dest2
+
   Stage: Stage-1
     Move Operator
       tables:
@@ -464,7 +539,7 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest2
 
-  Stage: Stage-4
+  Stage: Stage-6
     Stats-Aggr Operator