You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/12 19:35:20 UTC

svn commit: r1467364 - in /hive/branches/vectorization: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/s...

Author: hashutosh
Date: Fri Apr 12 17:35:19 2013
New Revision: 1467364

URL: http://svn.apache.org/r1467364
Log:
Merged in with trunk

Added:
    hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_1.q
      - copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_1.q
    hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_2.q
      - copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_2.q
    hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_3.q
      - copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_3.q
    hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_4.q
      - copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_4.q
    hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_5.q
      - copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_5.q
    hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
      - copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_6.q
    hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_7.q
      - copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_7.q
    hive/branches/vectorization/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_8.q
      - copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/bucketsortoptimize_insert_8.q
    hive/branches/vectorization/ql/src/test/queries/clientpositive/multi_insert_lateral_view.q
      - copied unchanged from r1467363, hive/trunk/ql/src/test/queries/clientpositive/multi_insert_lateral_view.q
    hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_1.q.out
      - copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_1.q.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_2.q.out
      - copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_2.q.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_3.q.out
      - copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_3.q.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_4.q.out
      - copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_4.q.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_5.q.out
      - copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_5.q.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_6.q.out
      - copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_6.q.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_7.q.out
      - copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_7.q.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out
      - copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/bucketsortoptimize_insert_8.q.out
    hive/branches/vectorization/ql/src/test/results/clientpositive/multi_insert_lateral_view.q.out
      - copied unchanged from r1467363, hive/trunk/ql/src/test/results/clientpositive/multi_insert_lateral_view.q.out
Modified:
    hive/branches/vectorization/   (props changed)
    hive/branches/vectorization/build.properties
    hive/branches/vectorization/build.xml
    hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/vectorization/conf/hive-default.xml.template
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java

Propchange: hive/branches/vectorization/
------------------------------------------------------------------------------
    svn:mergeinfo = /hive/trunk:1466908-1467363

Modified: hive/branches/vectorization/build.properties
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build.properties?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/build.properties (original)
+++ hive/branches/vectorization/build.properties Fri Apr 12 17:35:19 2013
@@ -17,6 +17,7 @@
 Name=Hive
 name=hive
 version=0.12.0-SNAPSHOT
+hcatalog.version=0.11.0-SNAPSHOT
 year=2012
 
 javac.debug=on

Modified: hive/branches/vectorization/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/build.xml?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/build.xml (original)
+++ hive/branches/vectorization/build.xml Fri Apr 12 17:35:19 2013
@@ -515,7 +515,7 @@
         inheritAll="false"/>
     <mkdir dir="${build.dir.hive}/hcatalog"/>
     <copy todir="${build.dir.hive}/hcatalog">
-        <fileset dir="${hive.root}/hcatalog/build/hcatalog-${version}"/>
+        <fileset dir="${hive.root}/hcatalog/build/hcatalog-${hcatalog.version}"/>
     </copy>
 
     <!-- special case because builtins compilation depends on packaging
@@ -1081,7 +1081,7 @@
           todir="${mvn.jar.dir}" />
     <copy file="${build.dir.hive}/metastore/hive-metastore-${version}.jar"
           todir="${mvn.jar.dir}" />
-    <copy file="${build.dir.hive}/hcatalog/hive-hcatalog-${version}.jar"
+    <copy file="${build.dir.hive}/hcatalog/hive-hcatalog-${hcatalog.version}.jar"
           todir="${mvn.jar.dir}" />
     <copy file="${build.dir.hive}/pdk/hive-pdk-${version}.jar"
           todir="${mvn.jar.dir}" />
@@ -1116,7 +1116,7 @@
     <copy file="${build.dir.hive}/metastore/pom.xml"
           tofile="${mvn.pom.dir}/hive-metastore-${version}.pom" />
     <copy file="${build.dir.hive}/hcatalog/pom.xml"
-          tofile="${mvn.pom.dir}/hive-hcatalog-${version}.pom" />
+          tofile="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.pom" />
     <copy file="${build.dir.hive}/pdk/pom.xml"
           tofile="${mvn.pom.dir}/hive-pdk-${version}.pom" />
     <copy file="${build.dir.hive}/ql/pom.xml"
@@ -1375,12 +1375,12 @@
 
     <!-- hive-hcatalog -->
     <sign-artifact
-        input.file="${mvn.pom.dir}/hive-hcatalog-${version}.jar"
-        output.file="${mvn.pom.dir}/hive-hcatalog-${version}.jar.asc"
+        input.file="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.jar"
+        output.file="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.jar.asc"
         gpg.passphrase="${gpg.passphrase}"/>
     <sign-artifact
-        input.file="${mvn.pom.dir}/hive-hcatalog-${version}.pom"
-        output.file="${mvn.pom.dir}/hive-hcatalog-${version}.pom.asc"
+        input.file="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.pom"
+        output.file="${mvn.pom.dir}/hive-hcatalog-${hcatalog.version}.pom.asc"
         gpg.passphrase="${gpg.passphrase}"/>
 
     <!-- hive-pdk -->

Modified: hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/vectorization/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Apr 12 17:35:19 2013
@@ -523,7 +523,7 @@ public class HiveConf extends Configurat
     HIVE_AUTO_SORTMERGE_JOIN("hive.auto.convert.sortmerge.join", false),
     HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR(
         "hive.auto.convert.sortmerge.join.bigtable.selection.policy",
-        "org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ"),
+        "org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ"),
 
     HIVESCRIPTOPERATORTRUST("hive.exec.script.trust", false),
     HIVEROWOFFSET("hive.exec.rowoffset", false),

Modified: hive/branches/vectorization/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/conf/hive-default.xml.template?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/conf/hive-default.xml.template (original)
+++ hive/branches/vectorization/conf/hive-default.xml.template Fri Apr 12 17:35:19 2013
@@ -969,9 +969,11 @@
 
 <property>
   <name>hive.auto.convert.sortmerge.join.bigtable.selection.policy</name>
-  <value>org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSelectorForAutoSMJ</value>
+  <value>org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ</value>
   <description>The policy to choose the big table for automatic conversion to sort-merge join.
-    By default, the leftmost table is assigned the big table. Other policies are based on size:
+    By default, the table with the largest partitions is assigned the big table. All policies are:
+    . based on position of the table - the leftmost table is selected
+    org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSMJ.
     . based on total size (all the partitions selected in the query) of the table 
     org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ.
     . based on average size (all the partitions selected in the query) of the table 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Fri Apr 12 17:35:19 2013
@@ -385,7 +385,7 @@ public abstract class CommonJoinOperator
   // all evaluation should be processed here for valid aliasFilterTags
   //
   // for MapJoin, filter tag is pre-calculated in MapredLocalTask and stored with value.
-  // when reading the hashtable, MapJoinObjectValue calcuates alias filter and provide it to join
+  // when reading the hashtable, MapJoinObjectValue calculates alias filter and provide it to join
   protected ArrayList<Object> getFilteredValue(byte alias, Object row) throws HiveException {
     boolean hasFilter = hasFilter(alias);
     ArrayList<Object> nr = JoinUtil.computeValues(row, joinValues[alias],

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Apr 12 17:35:19 2013
@@ -1549,6 +1549,7 @@ public abstract class Operator<T extends
       start++;
     }
     builder.append(name);
+    start += name.length();
     if (added) {
       if (op.getNumChild() > 0) {
         List<Operator<?>> children = op.getChildOperators();
@@ -1559,7 +1560,7 @@ public abstract class Operator<T extends
               builder.append(' ');
             }
           }
-          toString(builder, visited, children.get(i), start += name.length());
+          toString(builder, visited, children.get(i), start);
         }
       }
       return true;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Fri Apr 12 17:35:19 2013
@@ -98,10 +98,11 @@ public class TableScanOperator extends O
     // in the execution context. This is needed for the following scenario:
     // insert overwrite table T1 select * from T2;
     // where T1 and T2 are sorted/bucketed by the same keys into the same number of buckets
-    // Although one mapper per file is used (bucketizedinputhiveinput), it is possible that
+    // Although one mapper per file is used (BucketizedInputHiveInput), it is possible that
     // any mapper can pick up any file (depending on the size of the files). The bucket number
     // corresponding to the input file is stored to name the output bucket file appropriately.
-    Map<String, Integer> bucketNameMapping = conf != null ? conf.getBucketFileNameMapping() : null;
+    Map<String, Integer> bucketNameMapping =
+        (conf != null) ? conf.getBucketFileNameMapping() : null;
     if ((bucketNameMapping != null) && (!bucketNameMapping.isEmpty())) {
       String currentInputFile = getExecContext().getCurrentInputFile();
       getExecContext().setFileId(Integer.toString(bucketNameMapping.get(

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Fri Apr 12 17:35:19 2013
@@ -25,11 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.exec.ExtractOperator;
@@ -37,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Fi
 import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
@@ -56,6 +54,7 @@ import org.apache.hadoop.hive.ql.parse.S
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 
 /**
@@ -64,12 +63,15 @@ import org.apache.hadoop.hive.ql.plan.Se
  * insert overwrite table T1 select * from T2;
  * where T1 and T2 are bucketized/sorted on the same keys, we don't need a reducer to
  * enforce bucketing and sorting.
+ *
+ * It also optimizes queries of the form:
+ * insert overwrite table T1
+ * select * from T1 join T2 on T1.key = T2.key
+ * where T1, T2 and T3 are bucketized/sorted on the same key 'key', we don't need a reducer
+ * to enforce bucketing and sorting
  */
 public class BucketingSortingReduceSinkOptimizer implements Transform {
 
-  private static final Log LOG = LogFactory.getLog(BucketingSortingReduceSinkOptimizer.class
-      .getName());
-
   public BucketingSortingReduceSinkOptimizer() {
   }
 
@@ -77,7 +79,6 @@ public class BucketingSortingReduceSinkO
   public ParseContext transform(ParseContext pctx) throws SemanticException {
 
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    HiveConf conf = pctx.getConf();
 
     // process reduce sink added by hive.enforce.bucketing or hive.enforce.sorting
     opRules.put(new RuleRegExp("R1",
@@ -90,7 +91,7 @@ public class BucketingSortingReduceSinkO
     Dispatcher disp = new DefaultRuleDispatcher(getDefaultProc(), opRules, null);
     GraphWalker ogw = new DefaultGraphWalker(disp);
 
-    // Create a list of topop nodes
+    // Create a list of top nodes
     ArrayList<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.getTopOps().values());
     ogw.startWalking(topNodes, null);
@@ -117,7 +118,6 @@ public class BucketingSortingReduceSinkO
    *
    */
   public class BucketSortReduceSinkProcessor implements NodeProcessor {
-
     protected ParseContext pGraphContext;
 
     public BucketSortReduceSinkProcessor(ParseContext pGraphContext) {
@@ -142,28 +142,33 @@ public class BucketingSortingReduceSinkO
     }
 
     // Get the sort positions and sort order for the table
-    private List<ObjectPair<Integer, Integer>> getSortPositions(List<Order> tabSortCols,
+    // The sort order contains whether the sorting is happening ascending or descending
+    private ObjectPair<List<Integer>, List<Integer>> getSortPositionsOrder(
+        List<Order> tabSortCols,
         List<FieldSchema> tabCols) {
-      List<ObjectPair<Integer, Integer>> posns = new ArrayList<ObjectPair<Integer, Integer>>();
+      List<Integer> sortPositions = new ArrayList<Integer>();
+      List<Integer> sortOrders = new ArrayList<Integer>();
       for (Order sortCol : tabSortCols) {
         int pos = 0;
         for (FieldSchema tabCol : tabCols) {
           if (sortCol.getCol().equals(tabCol.getName())) {
-            posns.add(new ObjectPair<Integer, Integer>(pos, sortCol.getOrder()));
+            sortPositions.add(pos);
+            sortOrders.add(sortCol.getOrder());
             break;
           }
           pos++;
         }
       }
-      return posns;
+      return new ObjectPair<List<Integer>, List<Integer>>(sortPositions, sortOrders);
     }
 
-    // Return true if the parition is bucketed/sorted by the specified positions
+    // Return true if the partition is bucketed/sorted by the specified positions
     // The number of buckets, the sort order should also match along with the
     // columns which are bucketed/sorted
     private boolean checkPartition(Partition partition,
         List<Integer> bucketPositionsDest,
-        List<ObjectPair<Integer, Integer>> sortPositionsDest,
+        List<Integer> sortPositionsDest,
+        List<Integer> sortOrderDest,
         int numBucketsDest) {
       // The bucketing and sorting positions should exactly match
       int numBuckets = partition.getBucketCount();
@@ -173,10 +178,11 @@ public class BucketingSortingReduceSinkO
 
       List<Integer> partnBucketPositions =
           getBucketPositions(partition.getBucketCols(), partition.getTable().getCols());
-      List<ObjectPair<Integer, Integer>> partnSortPositions =
-          getSortPositions(partition.getSortCols(), partition.getTable().getCols());
+      ObjectPair<List<Integer>, List<Integer>> partnSortPositionsOrder =
+          getSortPositionsOrder(partition.getSortCols(), partition.getTable().getCols());
       return bucketPositionsDest.equals(partnBucketPositions) &&
-          sortPositionsDest.equals(partnSortPositions);
+          sortPositionsDest.equals(partnSortPositionsOrder.getFirst()) &&
+          sortOrderDest.equals(partnSortPositionsOrder.getSecond());
     }
 
     // Return true if the table is bucketed/sorted by the specified positions
@@ -184,7 +190,8 @@ public class BucketingSortingReduceSinkO
     // columns which are bucketed/sorted
     private boolean checkTable(Table table,
         List<Integer> bucketPositionsDest,
-        List<ObjectPair<Integer, Integer>> sortPositionsDest,
+        List<Integer> sortPositionsDest,
+        List<Integer> sortOrderDest,
         int numBucketsDest) {
       // The bucketing and sorting positions should exactly match
       int numBuckets = table.getNumBuckets();
@@ -194,12 +201,17 @@ public class BucketingSortingReduceSinkO
 
       List<Integer> tableBucketPositions =
           getBucketPositions(table.getBucketCols(), table.getCols());
-      List<ObjectPair<Integer, Integer>> tableSortPositions =
-          getSortPositions(table.getSortCols(), table.getCols());
+      ObjectPair<List<Integer>, List<Integer>> tableSortPositionsOrder =
+          getSortPositionsOrder(table.getSortCols(), table.getCols());
       return bucketPositionsDest.equals(tableBucketPositions) &&
-          sortPositionsDest.equals(tableSortPositions);
+          sortPositionsDest.equals(tableSortPositionsOrder.getFirst()) &&
+          sortOrderDest.equals(tableSortPositionsOrder.getSecond());
     }
 
+    // Store the bucket path to bucket number mapping in the table scan operator.
+    // Although one mapper per file is used (BucketizedInputHiveInput), it is possible that
+    // any mapper can pick up any file (depending on the size of the files). The bucket number
+    // corresponding to the input file is stored to name the output bucket file appropriately.
     private void storeBucketPathMapping(TableScanOperator tsOp, FileStatus[] srcs) {
       Map<String, Integer> bucketFileNameMapping = new HashMap<String, Integer>();
       for (int pos = 0; pos < srcs.length; pos++) {
@@ -222,12 +234,12 @@ public class BucketingSortingReduceSinkO
       // Store the mapping -> path, bucket number
       // This is needed since for the map-only job, any mapper can process any file.
       // For eg: if mapper 1 is processing the file corresponding to bucket 2, it should
-      // also output the file correspodning to bucket 2 of the output.
+      // also output the file corresponding to bucket 2 of the output.
       storeBucketPathMapping(tsOp, srcs);
     }
 
     // Remove the reduce sink operator
-    // Use bucketized hive input format so that one mapper processes exactly one file
+    // Use BucketizedHiveInputFormat so that one mapper processes exactly one file
     private void removeReduceSink(ReduceSinkOperator rsOp,
         TableScanOperator tsOp,
         FileSinkOperator fsOp) {
@@ -251,6 +263,97 @@ public class BucketingSortingReduceSinkO
       return -1;
     }
 
+    // The output columns for the destination table should match with the join keys
+    // This is to handle queries of the form:
+    // insert overwrite table T3
+    // select T1.key, T1.key2, UDF(T1.value, T2.value)
+    // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+    // where T1, T2 and T3 are bucketized/sorted on key and key2
+    // Assuming T1 is the table on which the mapper is run, the following is true:
+    // . The number of buckets for T1 and T3 should be same
+    // . The bucketing/sorting columns for T1, T2 and T3 should be same
+    // . The sort order of T1 should match with the sort order for T3.
+    // . If T1 is partitioned, only a single partition of T1 can be selected.
+    // . The select list should contain with (T1.key, T1.key2) or (T2.key, T2.key2)
+    // . After the join, only selects and filters are allowed.
+    private boolean validateSMBJoinKeys(SMBJoinDesc smbJoinDesc,
+        List<ExprNodeColumnDesc> sourceTableBucketCols,
+        List<ExprNodeColumnDesc> sourceTableSortCols,
+        List<Integer> sortOrder) {
+      // The sort-merge join creates the output sorted and bucketized by the same columns.
+      // This can be relaxed in the future if there is a requirement.
+      if (!sourceTableBucketCols.equals(sourceTableSortCols)) {
+        return false;
+      }
+
+      // Get the total number of columns selected, and for each output column, store the
+      // base table it points to. For
+      // insert overwrite table T3
+      // select T1.key, T1.key2, UDF(T1.value, T2.value)
+      // from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+      // the following arrays are created
+      // [0, 0, 0, 1] --> [T1, T1, T1, T2] (table mapping)
+      // [0, 1, 2, 0] --> [T1.0, T1.1, T1.2, T2.0] (table columns mapping)
+      Byte[] tagOrder = smbJoinDesc.getTagOrder();
+      Map<Byte, List<Integer>> retainList = smbJoinDesc.getRetainList();
+      int totalNumberColumns = 0;
+      for (Byte tag : tagOrder) {
+        totalNumberColumns += retainList.get(tag).size();
+      }
+
+      byte[] columnTableMappings = new byte[totalNumberColumns];
+      int[] columnNumberMappings = new int[totalNumberColumns];
+      int currentColumnPosition = 0;
+      for (Byte tag : tagOrder) {
+        for (int pos = 0; pos < retainList.get(tag).size(); pos++) {
+          columnTableMappings[currentColumnPosition] = tag;
+          columnNumberMappings[currentColumnPosition] = pos;
+          currentColumnPosition++;
+        }
+      }
+
+      // All output columns used for bucketing/sorting of the destination table should
+      // belong to the same input table
+      //   insert overwrite table T3
+      //   select T1.key, T2.key2, UDF(T1.value, T2.value)
+      //   from T1 join T2 on T1.key = T2.key and T1.key2 = T2.key2
+      // is not optimized, whereas the insert is optimized if the select list is either changed to
+      // (T1.key, T1.key2, UDF(T1.value, T2.value)) or (T2.key, T2.key2, UDF(T1.value, T2.value))
+      // Get the input table and make sure the keys match
+      List<String> outputColumnNames = smbJoinDesc.getOutputColumnNames();
+      byte tableTag = -1;
+      int[] columnNumbersExprList = new int[sourceTableBucketCols.size()];
+      int currentColPosition = 0;
+      for (ExprNodeColumnDesc bucketCol : sourceTableBucketCols) {
+        String colName = bucketCol.getColumn();
+        int colNumber = outputColumnNames.indexOf(colName);
+        if (colNumber < 0) {
+          return false;
+        }
+        if (tableTag < 0) {
+          tableTag = columnTableMappings[colNumber];
+        }
+        else if (tableTag != columnTableMappings[colNumber]) {
+          return false;
+        }
+        columnNumbersExprList[currentColPosition++] = columnNumberMappings[colNumber];
+      }
+
+      List<ExprNodeDesc> allExprs = smbJoinDesc.getExprs().get(tableTag);
+      List<ExprNodeDesc> keysSelectedTable = smbJoinDesc.getKeys().get(tableTag);
+      currentColPosition = 0;
+      for (ExprNodeDesc keySelectedTable : keysSelectedTable) {
+        if (!(keySelectedTable instanceof ExprNodeColumnDesc)) {
+          return false;
+        }
+        if (!allExprs.get(columnNumbersExprList[currentColPosition++]).isSame(keySelectedTable)) {
+          return false;
+        }
+      }
+
+      return true;
+    }
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
@@ -283,14 +386,21 @@ public class BucketingSortingReduceSinkO
       if (destTable == null) {
         return null;
       }
+      int numBucketsDestination = destTable.getNumBuckets();
 
       // Get the positions for sorted and bucketed columns
       // For sorted columns, also get the order (ascending/descending) - that should
       // also match for this to be converted to a map-only job.
+      // Get the positions for sorted and bucketed columns
+      // For sorted columns, also get the order (ascending/descending) - that should
+      // also match for this to be converted to a map-only job.
       List<Integer> bucketPositions =
           getBucketPositions(destTable.getBucketCols(), destTable.getCols());
-      List<ObjectPair<Integer, Integer>> sortPositions =
-          getSortPositions(destTable.getSortCols(), destTable.getCols());
+      ObjectPair<List<Integer>, List<Integer>> sortOrderPositions =
+          getSortPositionsOrder(destTable.getSortCols(), destTable.getCols());
+      List<Integer> sortPositions = sortOrderPositions.getFirst();
+      List<Integer> sortOrder = sortOrderPositions.getSecond();
+      boolean useBucketSortPositions = true;
 
       // Only selects and filters are allowed
       Operator<? extends OperatorDesc> op = rsOp;
@@ -298,119 +408,179 @@ public class BucketingSortingReduceSinkO
       // bucketed/sorted columns for the destination table
       List<ExprNodeColumnDesc> sourceTableBucketCols = new ArrayList<ExprNodeColumnDesc>();
       List<ExprNodeColumnDesc> sourceTableSortCols = new ArrayList<ExprNodeColumnDesc>();
+      op = op.getParentOperators().get(0);
 
       while (true) {
-        if (op.getParentOperators().size() > 1) {
-          return null;
-        }
-
-        op = op.getParentOperators().get(0);
         if (!(op instanceof TableScanOperator) &&
             !(op instanceof FilterOperator) &&
-            !(op instanceof SelectOperator)) {
+            !(op instanceof SelectOperator) &&
+            !(op instanceof SMBMapJoinOperator)) {
           return null;
         }
 
-        // nothing to be done for filters - the output schema does not change.
-        if (op instanceof TableScanOperator) {
-          Table srcTable = pGraphContext.getTopToTable().get(op);
-
-          // Find the positions of the bucketed columns in the table corresponding
-          // to the select list.
-          // Consider the following scenario:
-          // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
-          // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
-          // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
-          // should be optimized.
-
-          // Start with the destination: T2, bucketed/sorted position is [1]
-          // At the source T1, the column corresponding to that position is [key], which
-          // maps to column [0] of T1, which is also bucketed/sorted into the same
-          // number of buckets
-          List<Integer> newBucketPositions = new ArrayList<Integer>();
-          for (int pos = 0; pos < bucketPositions.size(); pos++) {
-            ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
-            String colName = col.getColumn();
-            int bucketPos = findColumnPosition(srcTable.getCols(), colName);
-            if (bucketPos < 0) {
-              return null;
-            }
-            newBucketPositions.add(bucketPos);
+        if (op instanceof SMBMapJoinOperator) {
+          // Bucketing and sorting keys should exactly match
+          if (!(bucketPositions.equals(sortPositions))) {
+            return null;
+          }
+          SMBMapJoinOperator smbOp = (SMBMapJoinOperator) op;
+          SMBJoinDesc smbJoinDesc = smbOp.getConf();
+          int posBigTable = smbJoinDesc.getPosBigTable();
+
+          // join keys dont match the bucketing keys
+          List<ExprNodeDesc> keysBigTable = smbJoinDesc.getKeys().get((byte) posBigTable);
+          if (keysBigTable.size() != bucketPositions.size()) {
+            return null;
           }
 
-          // Find the positions/order of the sorted columns in the table corresponding
-          // to the select list.
-          List<ObjectPair<Integer, Integer>> newSortPositions =
-              new ArrayList<ObjectPair<Integer, Integer>>();
-          for (int pos = 0; pos < sortPositions.size(); pos++) {
-            ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
-            String colName = col.getColumn();
-            int sortPos = findColumnPosition(srcTable.getCols(), colName);
-            if (sortPos < 0) {
-              return null;
-            }
-            newSortPositions.add(
-                new ObjectPair<Integer, Integer>(sortPos, sortPositions.get(pos).getSecond()));
+          if (!validateSMBJoinKeys(smbJoinDesc, sourceTableBucketCols,
+              sourceTableSortCols, sortOrder)) {
+            return null;
           }
 
+          sourceTableBucketCols.clear();
+          sourceTableSortCols.clear();
+          useBucketSortPositions = false;
 
-          if (srcTable.isPartitioned()) {
-            PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
-            List<Partition> partitions = prunedParts.getNotDeniedPartns();
-
-            // Support for dynamic partitions can be added later
-            // The following is not optimized:
-            // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
-            // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
-            if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+          for (ExprNodeDesc keyBigTable : keysBigTable) {
+            if (!(keyBigTable instanceof ExprNodeColumnDesc)) {
               return null;
             }
-            for (Partition partition : partitions) {
-              if (!checkPartition(partition, newBucketPositions, newSortPositions,
-                  pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
+            sourceTableBucketCols.add((ExprNodeColumnDesc) keyBigTable);
+            sourceTableSortCols.add((ExprNodeColumnDesc) keyBigTable);
+          }
+
+          // since it is a sort-merge join, only follow the big table
+          op = op.getParentOperators().get(posBigTable);
+        } else {
+          // nothing to be done for filters - the output schema does not change.
+          if (op instanceof TableScanOperator) {
+            assert !useBucketSortPositions;
+            Table srcTable = pGraphContext.getTopToTable().get(op);
+
+            // Find the positions of the bucketed columns in the table corresponding
+            // to the select list.
+            // Consider the following scenario:
+            // T1(key, value1, value2) bucketed/sorted by key into 2 buckets
+            // T2(dummy, key, value1, value2) bucketed/sorted by key into 2 buckets
+            // A query like: insert overwrite table T2 select 1, key, value1, value2 from T1
+            // should be optimized.
+
+            // Start with the destination: T2, bucketed/sorted position is [1]
+            // At the source T1, the column corresponding to that position is [key], which
+            // maps to column [0] of T1, which is also bucketed/sorted into the same
+            // number of buckets
+            List<Integer> newBucketPositions = new ArrayList<Integer>();
+            for (int pos = 0; pos < bucketPositions.size(); pos++) {
+              ExprNodeColumnDesc col = sourceTableBucketCols.get(pos);
+              String colName = col.getColumn();
+              int bucketPos = findColumnPosition(srcTable.getCols(), colName);
+              if (bucketPos < 0) {
                 return null;
               }
+              newBucketPositions.add(bucketPos);
             }
 
-            removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
-                partitions.get(0).getSortedPaths());
-            return null;
-          }
-          else {
-            if (!checkTable(srcTable, newBucketPositions, newSortPositions,
-                pGraphContext.getFsopToTable().get(fsOp).getNumBuckets())) {
-              return null;
+            // Find the positions/order of the sorted columns in the table corresponding
+            // to the select list.
+            List<Integer> newSortPositions = new ArrayList<Integer>();
+            for (int pos = 0; pos < sortPositions.size(); pos++) {
+              ExprNodeColumnDesc col = sourceTableSortCols.get(pos);
+              String colName = col.getColumn();
+              int sortPos = findColumnPosition(srcTable.getCols(), colName);
+              if (sortPos < 0) {
+                return null;
+              }
+              newSortPositions.add(sortPos);
             }
 
-            removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
-            return null;
-          }
-        }
-        // None of the operators is changing the positions
-        else if (op instanceof SelectOperator) {
-          SelectOperator selectOp = (SelectOperator) op;
-          SelectDesc selectDesc = selectOp.getConf();
+            if (srcTable.isPartitioned()) {
+              PrunedPartitionList prunedParts = pGraphContext.getOpToPartList().get(op);
+              List<Partition> partitions = prunedParts.getNotDeniedPartns();
+
+              // Support for dynamic partitions can be added later
+              // The following is not optimized:
+              // insert overwrite table T1(ds='1', hr) select key, value, hr from T2 where ds = '1';
+              // where T1 and T2 are bucketed by the same keys and partitioned by ds. hr
+              if ((partitions == null) || (partitions.isEmpty()) || (partitions.size() > 1)) {
+                return null;
+              }
+              for (Partition partition : partitions) {
+                if (!checkPartition(partition, newBucketPositions, newSortPositions, sortOrder,
+                    numBucketsDestination)) {
+                  return null;
+                }
+              }
 
-          // There may be multiple selects - chose the one closest to the table
-          sourceTableBucketCols.clear();
-          sourceTableSortCols.clear();
+              removeReduceSink(rsOp, (TableScanOperator) op, fsOp,
+                  partitions.get(0).getSortedPaths());
+              return null;
+            }
+            else {
+              if (!checkTable(srcTable, newBucketPositions, newSortPositions, sortOrder,
+                  numBucketsDestination)) {
+                return null;
+              }
 
-          // Only columns can be selected for both sorted and bucketed positions
-          for (int pos : bucketPositions) {
-            ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
-            if (!(selectColList instanceof ExprNodeColumnDesc)) {
+              removeReduceSink(rsOp, (TableScanOperator) op, fsOp, srcTable.getSortedPaths());
               return null;
             }
-            sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
           }
+          // None of the operators is changing the positions
+          else if (op instanceof SelectOperator) {
+            SelectOperator selectOp = (SelectOperator) op;
+            SelectDesc selectDesc = selectOp.getConf();
+
+            // Iterate backwards, from the destination table to the top of the tree
+            // Based on the output column names, get the new columns.
+            if (!useBucketSortPositions) {
+              bucketPositions.clear();
+              sortPositions.clear();
+              List<String> outputColumnNames = selectDesc.getOutputColumnNames();
+
+              for (ExprNodeColumnDesc col : sourceTableBucketCols) {
+                String colName = col.getColumn();
+                int colPos = outputColumnNames.indexOf(colName);
+                if (colPos < 0) {
+                  return null;
+                }
+                bucketPositions.add(colPos);
+              }
 
-          for (ObjectPair<Integer, Integer> pos : sortPositions) {
-            ExprNodeDesc selectColList = selectDesc.getColList().get(pos.getFirst());
-            if (!(selectColList instanceof ExprNodeColumnDesc)) {
-              return null;
+              for (ExprNodeColumnDesc col : sourceTableSortCols) {
+                String colName = col.getColumn();
+                int colPos = outputColumnNames.indexOf(colName);
+                if (colPos < 0) {
+                  return null;
+                }
+                sortPositions.add(colPos);
+              }
+            }
+
+            // There may be multiple selects - chose the one closest to the table
+            sourceTableBucketCols.clear();
+            sourceTableSortCols.clear();
+
+            // Only columns can be selected for both sorted and bucketed positions
+            for (int pos : bucketPositions) {
+              ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+              if (!(selectColList instanceof ExprNodeColumnDesc)) {
+                return null;
+              }
+              sourceTableBucketCols.add((ExprNodeColumnDesc) selectColList);
+            }
+
+            for (int pos : sortPositions) {
+              ExprNodeDesc selectColList = selectDesc.getColList().get(pos);
+              if (!(selectColList instanceof ExprNodeColumnDesc)) {
+                return null;
+              }
+              sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
             }
-            sourceTableSortCols.add((ExprNodeColumnDesc) selectColList);
+
+            useBucketSortPositions = false;
           }
+          op = op.getParentOperators().get(0);
         }
       }
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Fri Apr 12 17:35:19 2013
@@ -1828,6 +1828,7 @@ body
    :
    insertClause
    selectClause
+   lateralView?
    whereClause?
    groupByClause?
    havingClause?
@@ -1836,11 +1837,12 @@ body
    distributeByClause?
    sortByClause?
    window_clause?
-   limitClause? -> ^(TOK_INSERT insertClause?
-                     selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
+   limitClause? -> ^(TOK_INSERT insertClause
+                     selectClause lateralView? whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
                      distributeByClause? sortByClause? window_clause? limitClause?)
    |
    selectClause
+   lateralView?
    whereClause?
    groupByClause?
    havingClause?
@@ -1850,7 +1852,7 @@ body
    sortByClause?
    window_clause?
    limitClause? -> ^(TOK_INSERT ^(TOK_DESTINATION ^(TOK_DIR TOK_TMP_FILE))
-                     selectClause whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
+                     selectClause lateralView? whereClause? groupByClause? havingClause? orderByClause? clusterByClause?
                      distributeByClause? sortByClause? window_clause? limitClause?)
    ;
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Fri Apr 12 17:35:19 2013
@@ -90,6 +90,8 @@ public class QBParseInfo {
    */
   private final HashMap<String, ArrayList<ASTNode>> aliasToLateralViews;
 
+  private final HashMap<String, ASTNode> destToLateralView;
+
   /* Order by clause */
   private final HashMap<String, ASTNode> destToOrderby;
   private final HashMap<String, Integer> destToLimit;
@@ -111,6 +113,7 @@ public class QBParseInfo {
     nameToDest = new HashMap<String, ASTNode>();
     nameToSample = new HashMap<String, TableSample>();
     exprToColumnAlias = new HashMap<ASTNode, String>();
+    destToLateralView = new HashMap<String, ASTNode>();
     destToSelExpr = new LinkedHashMap<String, ASTNode>();
     destToWhereExpr = new HashMap<String, ASTNode>();
     destToGroupby = new HashMap<String, ASTNode>();
@@ -552,6 +555,9 @@ public class QBParseInfo {
     return nameToSample;
   }
 
+  public HashMap<String, ASTNode> getDestToLateralView() {
+    return destToLateralView;
+  }
 
   protected static enum ClauseType {
     CLUSTER_BY_CLAUSE,

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1467364&r1=1467363&r2=1467364&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Fri Apr 12 17:35:19 2013
@@ -1023,7 +1023,13 @@ public class SemanticAnalyzer extends Ba
                 .getMsg(partition.toString()));
           }
         }
-
+        skipRecursion = false;
+        break;
+      case HiveParser.TOK_LATERAL_VIEW:
+        // todo: nested LV
+        assert ast.getChildCount() == 1;
+        qb.getParseInfo().getDestToLateralView().put(ctx_1.dest, ast);
+        break;
       default:
         skipRecursion = false;
         break;
@@ -3989,7 +3995,7 @@ public class SemanticAnalyzer extends Ba
   }
 
   @SuppressWarnings({"nls"})
-  private Operator genGroupByPlan1MRMultiReduceGB(List<String> dests, QB qb, Operator input)
+  private Operator genGroupByPlan1ReduceMultiGBY(List<String> dests, QB qb, Operator input)
       throws SemanticException {
 
     QBParseInfo parseInfo = qb.getParseInfo();
@@ -6811,9 +6817,14 @@ public class SemanticAnalyzer extends Ba
   // Return the common distinct expression
   // There should be more than 1 destination, with group bys in all of them.
   private List<ASTNode> getCommonDistinctExprs(QB qb, Operator input) {
-    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     QBParseInfo qbp = qb.getParseInfo();
+    // If a grouping set aggregation is present, common processing is not possible
+    if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()
+        || !qbp.getDestToLateralView().isEmpty()) {
+      return null;
+    }
 
+    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     TreeSet<String> ks = new TreeSet<String>();
     ks.addAll(qbp.getClauseNames());
 
@@ -6822,15 +6833,10 @@ public class SemanticAnalyzer extends Ba
       return null;
     }
 
-    List<ExprNodeDesc.ExprNodeDescEqualityWrapper> oldList = null;
+    List<ExprNodeDesc> oldList = null;
     List<ASTNode> oldASTList = null;
 
     for (String dest : ks) {
-      // If a grouping set aggregation is present, common processing is not possible
-      if (!qbp.getDestCubes().isEmpty() || !qbp.getDestRollups().isEmpty()) {
-        return null;
-      }
-
       // If a filter is present, common processing is not possible
       if (qbp.getWhrForClause(dest) != null) {
         return null;
@@ -6847,7 +6853,7 @@ public class SemanticAnalyzer extends Ba
         return null;
       }
 
-      List<ExprNodeDesc.ExprNodeDescEqualityWrapper> currDestList;
+      List<ExprNodeDesc> currDestList;
       try {
         currDestList = getDistinctExprs(qbp, dest, inputRR);
       } catch (SemanticException e) {
@@ -6968,10 +6974,9 @@ public class SemanticAnalyzer extends Ba
   // Groups the clause names into lists so that any two clauses in the same list has the same
   // group by and distinct keys and no clause appears in more than one list. Returns a list of the
   // lists of clauses.
-  private List<List<String>> getCommonGroupByDestGroups(QB qb, Operator input)
-      throws SemanticException {
+  private List<List<String>> getCommonGroupByDestGroups(QB qb,
+      Map<String, Operator<? extends OperatorDesc>> inputs) throws SemanticException {
 
-    RowResolver inputRR = opParseCtx.get(input).getRowResolver();
     QBParseInfo qbp = qb.getParseInfo();
 
     TreeSet<String> ks = new TreeSet<String>();
@@ -6989,29 +6994,31 @@ public class SemanticAnalyzer extends Ba
       return commonGroupByDestGroups;
     }
 
-    List<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>> sprayKeyLists =
-        new ArrayList<List<ExprNodeDesc.ExprNodeDescEqualityWrapper>>(ks.size());
+    List<Operator<? extends OperatorDesc>> inputOperators =
+        new ArrayList<Operator<? extends OperatorDesc>>(ks.size());
+    List<List<ExprNodeDesc>> sprayKeyLists = new ArrayList<List<ExprNodeDesc>>(ks.size());
 
     // Iterate over each clause
     for (String dest : ks) {
-
-      List<ExprNodeDesc.ExprNodeDescEqualityWrapper> sprayKeys =
-          getDistinctExprs(qbp, dest, inputRR);
+      Operator input = inputs.get(dest);
+      RowResolver inputRR = opParseCtx.get(input).getRowResolver();
+      List<ExprNodeDesc> sprayKeys = getDistinctExprs(qbp, dest, inputRR);
 
       // Add the group by expressions
       List<ASTNode> grpByExprs = getGroupByForClause(qbp, dest);
       for (ASTNode grpByExpr : grpByExprs) {
-        ExprNodeDesc.ExprNodeDescEqualityWrapper grpByExprWrapper =
-            new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(grpByExpr, inputRR));
-        if (!sprayKeys.contains(grpByExprWrapper)) {
-          sprayKeys.add(grpByExprWrapper);
+        ExprNodeDesc exprDesc = genExprNodeDesc(grpByExpr, inputRR);
+        if (ExprNodeDescUtils.indexOf(exprDesc, sprayKeys) < 0) {
+          sprayKeys.add(exprDesc);
         }
       }
 
       // Loop through each of the lists of exprs, looking for a match
       boolean found = false;
       for (int i = 0; i < sprayKeyLists.size(); i++) {
-
+        if (!input.equals(inputOperators.get(i))) {
+          continue;
+        }
         if (!matchExprLists(sprayKeyLists.get(i), sprayKeys)) {
           continue;
         }
@@ -7024,6 +7031,7 @@ public class SemanticAnalyzer extends Ba
 
       // No match was found, so create new entries
       if (!found) {
+        inputOperators.add(input);
         sprayKeyLists.add(sprayKeys);
         List<String> destGroup = new ArrayList<String>();
         destGroup.add(dest);
@@ -7035,15 +7043,13 @@ public class SemanticAnalyzer extends Ba
   }
 
   // Returns whether or not two lists contain the same elements independent of order
-  private boolean matchExprLists(List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list1,
-      List<ExprNodeDesc.ExprNodeDescEqualityWrapper> list2) {
+  private boolean matchExprLists(List<ExprNodeDesc> list1, List<ExprNodeDesc> list2) {
 
     if (list1.size() != list2.size()) {
       return false;
     }
-
-    for (ExprNodeDesc.ExprNodeDescEqualityWrapper exprNodeDesc : list1) {
-      if (!list2.contains(exprNodeDesc)) {
+    for (ExprNodeDesc exprNodeDesc : list1) {
+      if (ExprNodeDescUtils.indexOf(exprNodeDesc, list2) < 0) {
         return false;
       }
     }
@@ -7051,23 +7057,20 @@ public class SemanticAnalyzer extends Ba
     return true;
   }
 
-  // Returns a list of the distinct exprs for a given clause name as
-  // ExprNodeDesc.ExprNodeDescEqualityWrapper without duplicates
-  private List<ExprNodeDesc.ExprNodeDescEqualityWrapper>
-      getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR) throws SemanticException {
+  // Returns a list of the distinct exprs without duplicates for a given clause name
+  private List<ExprNodeDesc> getDistinctExprs(QBParseInfo qbp, String dest, RowResolver inputRR)
+      throws SemanticException {
 
     List<ASTNode> distinctAggExprs = qbp.getDistinctFuncExprsForClause(dest);
-    List<ExprNodeDesc.ExprNodeDescEqualityWrapper> distinctExprs =
-        new ArrayList<ExprNodeDesc.ExprNodeDescEqualityWrapper>();
+    List<ExprNodeDesc> distinctExprs = new ArrayList<ExprNodeDesc>();
 
     for (ASTNode distinctAggExpr : distinctAggExprs) {
       // 0 is function name
       for (int i = 1; i < distinctAggExpr.getChildCount(); i++) {
         ASTNode parameter = (ASTNode) distinctAggExpr.getChild(i);
-        ExprNodeDesc.ExprNodeDescEqualityWrapper distinctExpr =
-            new ExprNodeDesc.ExprNodeDescEqualityWrapper(genExprNodeDesc(parameter, inputRR));
-        if (!distinctExprs.contains(distinctExpr)) {
-          distinctExprs.add(distinctExpr);
+        ExprNodeDesc expr = genExprNodeDesc(parameter, inputRR);
+        if (ExprNodeDescUtils.indexOf(expr, distinctExprs) < 0) {
+          distinctExprs.add(expr);
         }
       }
     }
@@ -7096,6 +7099,7 @@ public class SemanticAnalyzer extends Ba
     QBParseInfo qbp = qb.getParseInfo();
 
     TreeSet<String> ks = new TreeSet<String>(qbp.getClauseNames());
+    Map<String, Operator<? extends OperatorDesc>> inputs = createInputForDests(qb, input, ks);
     // For multi-group by with the same distinct, we ignore all user hints
     // currently. It doesnt matter whether he has asked to do
     // map-side aggregation or not. Map side aggregation is turned off
@@ -7148,7 +7152,7 @@ public class SemanticAnalyzer extends Ba
       // expressions, otherwise treat all the expressions as a single group
       if (conf.getBoolVar(HiveConf.ConfVars.HIVEMULTIGROUPBYSINGLEREDUCER)) {
         try {
-          commonGroupByDestGroups = getCommonGroupByDestGroups(qb, curr);
+          commonGroupByDestGroups = getCommonGroupByDestGroups(qb, inputs);
         } catch (SemanticException e) {
           LOG.error("Failed to group clauses by common spray keys.", e);
         }
@@ -7168,6 +7172,8 @@ public class SemanticAnalyzer extends Ba
           }
 
           String firstDest = commonGroupByDestGroup.get(0);
+          input = inputs.get(firstDest);
+
           // Constructs a standard group by plan if:
           // There is no other subquery with the same group by/distinct keys or
           // (There are no aggregations in a representative query for the group and
@@ -7182,7 +7188,7 @@ public class SemanticAnalyzer extends Ba
 
             // Go over all the destination tables
             for (String dest : commonGroupByDestGroup) {
-              curr = input;
+              curr = inputs.get(dest);
 
               if (qbp.getWhrForClause(dest) != null) {
                 curr = genFilterPlan(dest, qb, curr);
@@ -7215,7 +7221,7 @@ public class SemanticAnalyzer extends Ba
               curr = genPostGroupByBodyPlan(curr, dest, qb);
             }
           } else {
-            curr = genGroupByPlan1MRMultiReduceGB(commonGroupByDestGroup, qb, input);
+            curr = genGroupByPlan1ReduceMultiGBY(commonGroupByDestGroup, qb, input);
           }
         }
       }
@@ -7228,6 +7234,16 @@ public class SemanticAnalyzer extends Ba
     return curr;
   }
 
+  private Map<String, Operator<? extends OperatorDesc>> createInputForDests(QB qb,
+      Operator<? extends OperatorDesc> input, Set<String> dests) throws SemanticException {
+    Map<String, Operator<? extends OperatorDesc>> inputs =
+        new HashMap<String, Operator<? extends OperatorDesc>>();
+    for (String dest : dests) {
+      inputs.put(dest, genLateralViewPlanForDest(dest, qb, input));
+    }
+    return inputs;
+  }
+
   private Operator genPostGroupByBodyPlan(Operator curr, String dest, QB qb)
       throws SemanticException {
 
@@ -8037,71 +8053,7 @@ public class SemanticAnalyzer extends Ba
           // -> LateralViewJoinOperator
           //
 
-          RowResolver lvForwardRR = new RowResolver();
-          RowResolver source = opParseCtx.get(op).getRowResolver();
-          for (ColumnInfo col : source.getColumnInfos()) {
-            if (col.getIsVirtualCol() && col.isHiddenVirtualCol()) {
-              continue;
-            }
-            String[] tabCol = source.reverseLookup(col.getInternalName());
-            lvForwardRR.put(tabCol[0], tabCol[1], col);
-          }
-
-          Operator lvForward = putOpInsertMap(OperatorFactory.getAndMakeChild(
-              new LateralViewForwardDesc(), new RowSchema(lvForwardRR.getColumnInfos()),
-              op), lvForwardRR);
-
-          // The order in which the two paths are added is important. The
-          // lateral view join operator depends on having the select operator
-          // give it the row first.
-
-          // Get the all path by making a select(*).
-          RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver();
-          // Operator allPath = op;
-          Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild(
-              new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()),
-              lvForward), allPathRR);
-          // Get the UDTF Path
-          QB blankQb = new QB(null, null, false);
-          Operator udtfPath = genSelectPlan((ASTNode) lateralViewTree
-              .getChild(0), blankQb, lvForward);
-          // add udtf aliases to QB
-          for (String udtfAlias : blankQb.getAliases()) {
-            qb.addAlias(udtfAlias);
-          }
-          RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRowResolver();
-
-          // Merge the two into the lateral view join
-          // The cols of the merged result will be the combination of both the
-          // cols of the UDTF path and the cols of the all path. The internal
-          // names have to be changed to avoid conflicts
-
-          RowResolver lateralViewRR = new RowResolver();
-          ArrayList<String> outputInternalColNames = new ArrayList<String>();
-
-          LVmergeRowResolvers(allPathRR, lateralViewRR, outputInternalColNames);
-          LVmergeRowResolvers(udtfPathRR, lateralViewRR, outputInternalColNames);
-
-          // For PPD, we need a column to expression map so that during the walk,
-          // the processor knows how to transform the internal col names.
-          // Following steps are dependant on the fact that we called
-          // LVmerge.. in the above order
-          Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
-
-          int i = 0;
-          for (ColumnInfo c : allPathRR.getColumnInfos()) {
-            String internalName = getColumnInternalName(i);
-            i++;
-            colExprMap.put(internalName,
-                new ExprNodeColumnDesc(c.getType(), c.getInternalName(),
-                    c.getTabAlias(), c.getIsVirtualCol()));
-          }
-
-          Operator lateralViewJoin = putOpInsertMap(OperatorFactory
-              .getAndMakeChild(new LateralViewJoinDesc(outputInternalColNames),
-                  new RowSchema(lateralViewRR.getColumnInfos()), allPath,
-                  udtfPath), lateralViewRR);
-          lateralViewJoin.setColumnExprMap(colExprMap);
+          Operator lateralViewJoin = genLateralViewPlan(qb, op, lateralViewTree);
           op = lateralViewJoin;
         }
         e.setValue(op);
@@ -8109,6 +8061,85 @@ public class SemanticAnalyzer extends Ba
     }
   }
 
+  private Operator genLateralViewPlanForDest(String dest, QB qb, Operator op)
+      throws SemanticException {
+    ASTNode lateralViewTree = qb.getParseInfo().getDestToLateralView().get(dest);
+    if (lateralViewTree != null) {
+      return genLateralViewPlan(qb, op, lateralViewTree);
+    }
+    return op;
+  }
+
+  private Operator genLateralViewPlan(QB qb, Operator op, ASTNode lateralViewTree)
+      throws SemanticException {
+    RowResolver lvForwardRR = new RowResolver();
+    RowResolver source = opParseCtx.get(op).getRowResolver();
+    for (ColumnInfo col : source.getColumnInfos()) {
+      if (col.getIsVirtualCol() && col.isHiddenVirtualCol()) {
+        continue;
+      }
+      String[] tabCol = source.reverseLookup(col.getInternalName());
+      lvForwardRR.put(tabCol[0], tabCol[1], col);
+    }
+
+    Operator lvForward = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        new LateralViewForwardDesc(), new RowSchema(lvForwardRR.getColumnInfos()),
+        op), lvForwardRR);
+
+    // The order in which the two paths are added is important. The
+    // lateral view join operator depends on having the select operator
+    // give it the row first.
+
+    // Get the all path by making a select(*).
+    RowResolver allPathRR = opParseCtx.get(lvForward).getRowResolver();
+    // Operator allPath = op;
+    Operator allPath = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        new SelectDesc(true), new RowSchema(allPathRR.getColumnInfos()),
+        lvForward), allPathRR);
+    // Get the UDTF Path
+    QB blankQb = new QB(null, null, false);
+    Operator udtfPath = genSelectPlan((ASTNode) lateralViewTree
+        .getChild(0), blankQb, lvForward);
+    // add udtf aliases to QB
+    for (String udtfAlias : blankQb.getAliases()) {
+      qb.addAlias(udtfAlias);
+    }
+    RowResolver udtfPathRR = opParseCtx.get(udtfPath).getRowResolver();
+
+    // Merge the two into the lateral view join
+    // The cols of the merged result will be the combination of both the
+    // cols of the UDTF path and the cols of the all path. The internal
+    // names have to be changed to avoid conflicts
+
+    RowResolver lateralViewRR = new RowResolver();
+    ArrayList<String> outputInternalColNames = new ArrayList<String>();
+
+    LVmergeRowResolvers(allPathRR, lateralViewRR, outputInternalColNames);
+    LVmergeRowResolvers(udtfPathRR, lateralViewRR, outputInternalColNames);
+
+    // For PPD, we need a column to expression map so that during the walk,
+    // the processor knows how to transform the internal col names.
+    // Following steps are dependant on the fact that we called
+    // LVmerge.. in the above order
+    Map<String, ExprNodeDesc> colExprMap = new HashMap<String, ExprNodeDesc>();
+
+    int i = 0;
+    for (ColumnInfo c : allPathRR.getColumnInfos()) {
+      String internalName = getColumnInternalName(i);
+      i++;
+      colExprMap.put(internalName,
+          new ExprNodeColumnDesc(c.getType(), c.getInternalName(),
+              c.getTabAlias(), c.getIsVirtualCol()));
+    }
+
+    Operator lateralViewJoin = putOpInsertMap(OperatorFactory
+        .getAndMakeChild(new LateralViewJoinDesc(outputInternalColNames),
+            new RowSchema(lateralViewRR.getColumnInfos()), allPath,
+            udtfPath), lateralViewRR);
+    lateralViewJoin.setColumnExprMap(colExprMap);
+    return lateralViewJoin;
+  }
+
   /**
    * A helper function that gets all the columns and respective aliases in the
    * source and puts them into dest. It renames the internal names of the
@@ -8704,7 +8735,7 @@ public class SemanticAnalyzer extends Ba
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("\n" + Operator.toString(pCtx.getTopOps().values()));
+      LOG.debug("Before logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
     }
 
     Optimizer optm = new Optimizer();
@@ -8713,7 +8744,7 @@ public class SemanticAnalyzer extends Ba
     pCtx = optm.optimize();
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("\n" + Operator.toString(pCtx.getTopOps().values()));
+      LOG.debug("After logical optimization\n" + Operator.toString(pCtx.getTopOps().values()));
     }
 
     // Generate column access stats if required - wait until column pruning takes place