You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2012/09/30 22:41:03 UTC

svn commit: r1392105 [3/7] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimiz...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseContext.java Sun Sep 30 20:41:01 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Gr
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo;
@@ -86,6 +87,11 @@ public class ParseContext {
   private Map<GroupByOperator, Set<String>> groupOpToInputTables;
   private Map<String, PrunedPartitionList> prunedPartitions;
 
+  //a map from non-map-side group by pattern (RS-GBY) to map-side group by pattern (GBY-RS-GBY)
+  Map<ReduceSinkOperator, GroupByOperator> groupbyNonMapSide2MapSide;
+  //a map from map-side group by pattern (GBY-RS-GBY) to non-map-side group by pattern (RS-GBY)
+  Map<GroupByOperator, ReduceSinkOperator> groupbyMapSide2NonMapSide;
+
   /**
    * The lineage information.
    */
@@ -169,7 +175,9 @@ public class ParseContext {
       GlobalLimitCtx globalLimitCtx,
       HashMap<String, SplitSample> nameToSplitSample,
       HashSet<ReadEntity> semanticInputs, List<Task<? extends Serializable>> rootTasks,
-      Map<TableScanOperator, ExprNodeDesc> opToSkewedPruner) {
+      Map<TableScanOperator, ExprNodeDesc> opToSkewedPruner,
+      Map<ReduceSinkOperator, GroupByOperator> groupbyNonMapSide2MapSide,
+      Map<GroupByOperator, ReduceSinkOperator> groupbyMapSide2NonMapSide) {
     this.conf = conf;
     this.qb = qb;
     this.ast = ast;
@@ -196,6 +204,8 @@ public class ParseContext {
     this.semanticInputs = semanticInputs;
     this.rootTasks = rootTasks;
     this.opToSkewedPruner = opToSkewedPruner;
+    this.groupbyNonMapSide2MapSide = groupbyNonMapSide2MapSide;
+    this.groupbyMapSide2NonMapSide = groupbyMapSide2NonMapSide;
   }
 
   /**
@@ -538,7 +548,7 @@ public class ParseContext {
   }
 
   public void replaceRootTask(Task<? extends Serializable> rootTask,
-                              List<? extends Task<? extends Serializable>> tasks) {
+      List<? extends Task<? extends Serializable>> tasks) {
     this.rootTasks.remove(rootTask);
     this.rootTasks.addAll(tasks);
   }
@@ -576,4 +586,11 @@ public class ParseContext {
     this.opToSkewedPruner = opToSkewedPruner;
   }
 
+  public Map<ReduceSinkOperator, GroupByOperator> getGroupbyNonMapSide2MapSide() {
+    return groupbyNonMapSide2MapSide;
+  }
+
+  public Map<GroupByOperator, ReduceSinkOperator> getGroupbyMapSide2NonMapSide() {
+    return groupbyMapSide2NonMapSide;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Sun Sep 30 20:41:01 2012
@@ -188,7 +188,7 @@ public class SemanticAnalyzer extends Ba
   private List<LoadTableDesc> loadTableWork;
   private List<LoadFileDesc> loadFileWork;
   private Map<JoinOperator, QBJoinTree> joinContext;
-  private final HashMap<TableScanOperator, Table> topToTable;
+  private HashMap<TableScanOperator, Table> topToTable;
   private QB qb;
   private ASTNode ast;
   private int destTableId;
@@ -210,6 +210,11 @@ public class SemanticAnalyzer extends Ba
   private final UnparseTranslator unparseTranslator;
   private final GlobalLimitCtx globalLimitCtx = new GlobalLimitCtx();
 
+  // a map from non-map-side group by pattern (RS-GBY) to map-side group by pattern (GBY-RS-GBY)
+  Map<ReduceSinkOperator, GroupByOperator> groupbyNonMapSide2MapSide;
+  // a map from map-side group by pattern (GBY-RS-GBY) to non-map-side group by pattern (RS-GBY)
+  Map<GroupByOperator, ReduceSinkOperator> groupbyMapSide2NonMapSide;
+
   //prefix for column names auto generated by hive
   private final String autogenColAliasPrfxLbl;
   private final boolean autogenColAliasPrfxIncludeFuncName;
@@ -248,6 +253,8 @@ public class SemanticAnalyzer extends Ba
                          HiveConf.ConfVars.HIVE_AUTOGEN_COLUMNALIAS_PREFIX_INCLUDEFUNCNAME);
     queryProperties = new QueryProperties();
     opToSkewedPruner = new HashMap<TableScanOperator, ExprNodeDesc>();
+    groupbyNonMapSide2MapSide = new HashMap<ReduceSinkOperator, GroupByOperator>();
+    groupbyMapSide2NonMapSide = new HashMap<GroupByOperator, ReduceSinkOperator>();
   }
 
   @Override
@@ -266,6 +273,9 @@ public class SemanticAnalyzer extends Ba
     opParseCtx.clear();
     groupOpToInputTables.clear();
     prunedPartitions.clear();
+    topToTable.clear();
+    groupbyNonMapSide2MapSide.clear();
+    groupbyMapSide2NonMapSide.clear();
   }
 
   public void init(ParseContext pctx) {
@@ -273,6 +283,7 @@ public class SemanticAnalyzer extends Ba
     opToPartList = pctx.getOpToPartList();
     opToSamplePruner = pctx.getOpToSamplePruner();
     topOps = pctx.getTopOps();
+    topToTable = pctx.getTopToTable();
     topSelOps = pctx.getTopSelOps();
     opParseCtx = pctx.getOpParseCtx();
     loadTableWork = pctx.getLoadTableWork();
@@ -288,6 +299,8 @@ public class SemanticAnalyzer extends Ba
     prunedPartitions = pctx.getPrunedPartitions();
     fetchTask = pctx.getFetchTask();
     setLineageInfo(pctx.getLineageInfo());
+    groupbyNonMapSide2MapSide = pctx.getGroupbyNonMapSide2MapSide();
+    groupbyMapSide2NonMapSide = pctx.getGroupbyMapSide2NonMapSide();
   }
 
   public ParseContext getParseContext() {
@@ -295,7 +308,8 @@ public class SemanticAnalyzer extends Ba
         topSelOps, opParseCtx, joinContext, topToTable, loadTableWork,
         loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
-        opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToSkewedPruner);
+        opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToSkewedPruner,
+        groupbyNonMapSide2MapSide, groupbyMapSide2NonMapSide);
   }
 
   @SuppressWarnings("nls")
@@ -2907,7 +2921,7 @@ public class SemanticAnalyzer extends Ba
         colExprMap);
 
     List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
-        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, colExprMap);
 
     ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
     HashMap<String, ASTNode> aggregationTrees = parseInfo
@@ -2915,7 +2929,7 @@ public class SemanticAnalyzer extends Ba
 
     if (!mapAggrDone) {
       getReduceValuesForReduceSinkNoMapAgg(parseInfo, dest, reduceSinkInputRowResolver,
-          reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues);
+          reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues, colExprMap);
     } else {
       // Put partial aggregation results in reduceValues
       int inputField = reduceKeys.size();
@@ -2924,14 +2938,16 @@ public class SemanticAnalyzer extends Ba
 
         TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get(
             inputField).getType();
-        reduceValues.add(new ExprNodeColumnDesc(type,
-            getColumnInternalName(inputField), "", false));
+        ExprNodeDesc expr = new ExprNodeColumnDesc(type,
+            getColumnInternalName(inputField), "", false);
+        reduceValues.add(expr);
         inputField++;
         outputValueColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
         String field = Utilities.ReduceField.VALUE.toString() + "."
             + getColumnInternalName(reduceValues.size() - 1);
-        reduceSinkOutputRowResolver.putExpression(entry.getValue(),
-            new ColumnInfo(field, type, null, false));
+        ColumnInfo colInfo = new ColumnInfo(field, type, null, false);
+        reduceSinkOutputRowResolver.putExpression(entry.getValue(), colInfo);
+        colExprMap.put(colInfo.getInternalName(), expr);
       }
     }
 
@@ -2976,7 +2992,8 @@ public class SemanticAnalyzer extends Ba
 
   private List<List<Integer>> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo, String dest,
       ArrayList<ExprNodeDesc> reduceKeys, RowResolver reduceSinkInputRowResolver,
-      RowResolver reduceSinkOutputRowResolver, List<String> outputKeyColumnNames)
+      RowResolver reduceSinkOutputRowResolver, List<String> outputKeyColumnNames,
+      Map<String, ExprNodeDesc> colExprMap)
       throws SemanticException {
 
     List<List<Integer>> distinctColIndices = new ArrayList<List<Integer>>();
@@ -3015,6 +3032,7 @@ public class SemanticAnalyzer extends Ba
           ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false);
           reduceSinkOutputRowResolver.putExpression(parameter, colInfo);
           numExprs++;
+          colExprMap.put(colInfo.getInternalName(), expr);
         }
         distinctColIndices.add(distinctIndices);
       }
@@ -3025,7 +3043,8 @@ public class SemanticAnalyzer extends Ba
 
   private void getReduceValuesForReduceSinkNoMapAgg(QBParseInfo parseInfo, String dest,
       RowResolver reduceSinkInputRowResolver, RowResolver reduceSinkOutputRowResolver,
-      List<String> outputValueColumnNames, ArrayList<ExprNodeDesc> reduceValues)
+      List<String> outputValueColumnNames, ArrayList<ExprNodeDesc> reduceValues,
+      Map<String, ExprNodeDesc> colExprMap)
       throws SemanticException {
     HashMap<String, ASTNode> aggregationTrees = parseInfo
         .getAggregationExprsForClause(dest);
@@ -3037,15 +3056,16 @@ public class SemanticAnalyzer extends Ba
       for (int i = 1; i < value.getChildCount(); i++) {
         ASTNode parameter = (ASTNode) value.getChild(i);
         if (reduceSinkOutputRowResolver.getExpression(parameter) == null) {
-          reduceValues.add(genExprNodeDesc(parameter,
-              reduceSinkInputRowResolver));
+          ExprNodeDesc expr = genExprNodeDesc(parameter, reduceSinkInputRowResolver);
+          reduceValues.add(expr);
           outputValueColumnNames
               .add(getColumnInternalName(reduceValues.size() - 1));
           String field = Utilities.ReduceField.VALUE.toString() + "."
               + getColumnInternalName(reduceValues.size() - 1);
-          reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field,
-              reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null,
-              false));
+          ColumnInfo colInfo = new ColumnInfo(field,
+              reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null, false);
+          reduceSinkOutputRowResolver.putExpression(parameter, colInfo);
+          colExprMap.put(colInfo.getInternalName(), expr);
         }
       }
     }
@@ -3076,7 +3096,7 @@ public class SemanticAnalyzer extends Ba
         colExprMap);
 
     List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
-        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames, colExprMap);
 
     ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
 
@@ -3085,7 +3105,7 @@ public class SemanticAnalyzer extends Ba
     for (String destination : dests) {
 
       getReduceValuesForReduceSinkNoMapAgg(parseInfo, destination, reduceSinkInputRowResolver,
-          reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues);
+          reduceSinkOutputRowResolver, outputValueColumnNames, reduceValues, colExprMap);
 
       // Need to pass all of the columns used in the where clauses as reduce values
       ASTNode whereClause = parseInfo.getWhrForClause(destination);
@@ -3095,15 +3115,18 @@ public class SemanticAnalyzer extends Ba
         for (int i = 0; i < columnExprs.size(); i++) {
           ASTNode parameter = columnExprs.get(i);
           if (reduceSinkOutputRowResolver.getExpression(parameter) == null) {
-            reduceValues.add(genExprNodeDesc(parameter,
-                reduceSinkInputRowResolver));
+            ExprNodeDesc expr = genExprNodeDesc(parameter,
+                reduceSinkInputRowResolver);
+            reduceValues.add(expr);
             outputValueColumnNames
                 .add(getColumnInternalName(reduceValues.size() - 1));
             String field = Utilities.ReduceField.VALUE.toString() + "."
                 + getColumnInternalName(reduceValues.size() - 1);
-            reduceSinkOutputRowResolver.putExpression(parameter, new ColumnInfo(field,
+            ColumnInfo colInfo = new ColumnInfo(field,
                 reduceValues.get(reduceValues.size() - 1).getTypeInfo(), null,
-                false));
+                false);
+            reduceSinkOutputRowResolver.putExpression(parameter, colInfo);
+            colExprMap.put(colInfo.getInternalName(), expr);
           }
         }
       }
@@ -3200,13 +3223,16 @@ public class SemanticAnalyzer extends Ba
       ASTNode t = entry.getValue();
       TypeInfo typeInfo = reduceSinkInputRowResolver2.getExpression(t)
           .getType();
-      reduceValues.add(new ExprNodeColumnDesc(typeInfo, field, "", false));
+      ExprNodeColumnDesc inputExpr = new ExprNodeColumnDesc(typeInfo, field,
+          "", false);
+      reduceValues.add(inputExpr);
       inputField++;
       String col = getColumnInternalName(reduceValues.size() - 1);
       outputColumnNames.add(col);
-      reduceSinkOutputRowResolver2.putExpression(t, new ColumnInfo(
-          Utilities.ReduceField.VALUE.toString() + "." + col, typeInfo, "",
-          false));
+      ColumnInfo colInfo = new ColumnInfo(Utilities.ReduceField.VALUE.toString()
+          + "." + col, typeInfo, "", false);
+      reduceSinkOutputRowResolver2.putExpression(t, colInfo);
+      colExprMap.put(colInfo.getInternalName(), inputExpr);
     }
 
     ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
@@ -5918,6 +5944,7 @@ public class SemanticAnalyzer extends Ba
               reduceValues.size() - 1).getTypeInfo(), "", false);
           reduceSinkOutputRowResolver.putExpression(grpbyExpr, colInfo);
           outputColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
+          colExprMap.put(colInfo.getInternalName(), grpByExprNode);
         }
       }
 
@@ -5944,6 +5971,7 @@ public class SemanticAnalyzer extends Ba
             reduceSinkOutputRowResolver.putExpression(paraExpr, colInfo);
             outputColumnNames
                 .add(getColumnInternalName(reduceValues.size() - 1));
+            colExprMap.put(colInfo.getInternalName(), paraExprNode);
           }
         }
       }
@@ -6182,7 +6210,23 @@ public class SemanticAnalyzer extends Ba
                 curr = insertSelectAllPlanForGroupBy(curr);
                 if (conf.getBoolVar(HiveConf.ConfVars.HIVEMAPSIDEAGGREGATE)) {
                   if (!conf.getBoolVar(HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+                    Operator rsopInNonMapSidePattern = null;
+                    Operator mapSideGroupBy = null;
+                    if (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) {
+                      Operator nonMapSidePattern = genGroupByPlan1MR(dest, qb, curr);
+                      rsopInNonMapSidePattern = (Operator) nonMapSidePattern
+                          .getParentOperators().get(0);
+                      curr.getChildOperators().remove(rsopInNonMapSidePattern);
+                    }
                     curr = genGroupByPlanMapAggr1MR(dest, qb, curr);
+                    mapSideGroupBy = (Operator) ((Operator) curr.getParentOperators().get(0))
+                        .getParentOperators().get(0);
+                    if (conf.getBoolVar(HiveConf.ConfVars.HIVEOPTCORRELATION)) {
+                      groupbyNonMapSide2MapSide.put((ReduceSinkOperator) rsopInNonMapSidePattern,
+                          (GroupByOperator) mapSideGroupBy);
+                      groupbyMapSide2NonMapSide.put((GroupByOperator) mapSideGroupBy,
+                          (ReduceSinkOperator) rsopInNonMapSidePattern);
+                    }
                   } else {
                     curr = genGroupByPlanMapAggr2MR(dest, qb, curr);
                   }
@@ -7595,7 +7639,8 @@ public class SemanticAnalyzer extends Ba
         opToPartList, topOps, topSelOps, opParseCtx, joinContext, topToTable,
         loadTableWork, loadFileWork, ctx, idToTableNameMap, destTableId, uCtx,
         listMapJoinOpsNoReducer, groupOpToInputTables, prunedPartitions,
-        opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToSkewedPruner);
+        opToSamplePruner, globalLimitCtx, nameToSplitSample, inputs, rootTasks, opToSkewedPruner,
+        groupbyNonMapSide2MapSide, groupbyMapSide2NonMapSide);
 
     Optimizer optm = new Optimizer();
     optm.setPctx(pCtx);

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseReduceSinkDesc.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * BaseReduceSinkDesc.
+ *
+ */
+@Explain(displayName = "Base Reduce Output Operator")
+public class BaseReduceSinkDesc extends AbstractOperatorDesc {
+  private static final long serialVersionUID = 1L;
+  /**
+   * Key columns are passed to reducer in the "key".
+   */
+  protected ArrayList<ExprNodeDesc> keyCols;
+  protected ArrayList<String> outputKeyColumnNames;
+  protected List<List<Integer>> distinctColumnIndices;
+  /**
+   * Value columns are passed to reducer in the "value".
+   */
+  protected ArrayList<ExprNodeDesc> valueCols;
+  protected ArrayList<java.lang.String> outputValueColumnNames;
+  /**
+   * Describe how to serialize the key.
+   */
+  protected TableDesc keySerializeInfo;
+  /**
+   * Describe how to serialize the value.
+   */
+  protected TableDesc valueSerializeInfo;
+
+  /**
+   * The tag for this reducesink descriptor.
+   */
+  protected int tag;
+
+  /**
+   * Number of distribution keys.
+   */
+  protected int numDistributionKeys;
+
+  /**
+   * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
+   * Partition columns decide the reducer that the current row goes to.
+   * Partition columns are not passed to reducer.
+   */
+  protected ArrayList<ExprNodeDesc> partitionCols;
+
+  protected int numReducers;
+
+  public BaseReduceSinkDesc() {
+  }
+
+  public ArrayList<String> getOutputKeyColumnNames() {
+    return outputKeyColumnNames;
+  }
+
+  public void setOutputKeyColumnNames(
+      ArrayList<String> outputKeyColumnNames) {
+    this.outputKeyColumnNames = outputKeyColumnNames;
+  }
+
+  public ArrayList<String> getOutputValueColumnNames() {
+    return outputValueColumnNames;
+  }
+
+  public void setOutputValueColumnNames(
+      ArrayList<String> outputValueColumnNames) {
+    this.outputValueColumnNames = outputValueColumnNames;
+  }
+
+  @Explain(displayName = "key expressions")
+  public ArrayList<ExprNodeDesc> getKeyCols() {
+    return keyCols;
+  }
+
+  public void setKeyCols(final ArrayList<ExprNodeDesc> keyCols) {
+    this.keyCols = keyCols;
+  }
+
+  public int getNumDistributionKeys() {
+    return this.numDistributionKeys;
+  }
+
+  public void setNumDistributionKeys(int numKeys) {
+    this.numDistributionKeys = numKeys;
+  }
+
+  @Explain(displayName = "value expressions")
+  public ArrayList<ExprNodeDesc> getValueCols() {
+    return valueCols;
+  }
+
+  public void setValueCols(final ArrayList<ExprNodeDesc> valueCols) {
+    this.valueCols = valueCols;
+  }
+
+  @Explain(displayName = "Map-reduce partition columns")
+  public ArrayList<ExprNodeDesc> getPartitionCols() {
+    return partitionCols;
+  }
+
+  public void setPartitionCols(
+      final ArrayList<ExprNodeDesc> partitionCols) {
+    this.partitionCols = partitionCols;
+  }
+
+  @Explain(displayName = "tag")
+  public int getTag() {
+    return tag;
+  }
+
+  public void setTag(int tag) {
+    this.tag = tag;
+  }
+
+  /**
+   * Returns the number of reducers for the map-reduce job. -1 means to decide
+   * the number of reducers at runtime. This enables Hive to estimate the number
+   * of reducers based on the map-reduce input data size, which is only
+   * available right before we start the map-reduce job.
+   */
+  public int getNumReducers() {
+    return numReducers;
+  }
+
+  public void setNumReducers(int numReducers) {
+    this.numReducers = numReducers;
+  }
+
+  public TableDesc getKeySerializeInfo() {
+    return keySerializeInfo;
+  }
+
+  public void setKeySerializeInfo(TableDesc keySerializeInfo) {
+    this.keySerializeInfo = keySerializeInfo;
+  }
+
+  public TableDesc getValueSerializeInfo() {
+    return valueSerializeInfo;
+  }
+
+  public void setValueSerializeInfo(TableDesc valueSerializeInfo) {
+    this.valueSerializeInfo = valueSerializeInfo;
+  }
+
+  /**
+   * Returns the sort order of the key columns.
+   *
+   * @return null, which means ascending order for all key columns, or a String
+   *         of the same length as key columns, that consists of only "+"
+   *         (ascending order) and "-" (descending order).
+   */
+  @Explain(displayName = "sort order")
+  public String getOrder() {
+    return keySerializeInfo.getProperties().getProperty(
+        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER);
+  }
+
+  public void setOrder(String orderStr) {
+    keySerializeInfo.getProperties().setProperty(
+        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER,
+        orderStr);
+  }
+
+  public List<List<Integer>> getDistinctColumnIndices() {
+    return distinctColumnIndices;
+  }
+
+  public void setDistinctColumnIndices(
+      List<List<Integer>> distinctColumnIndices) {
+    this.distinctColumnIndices = distinctColumnIndices;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationCompositeDesc.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+
+
+/**
+ * Correlation composite operator Descriptor implementation.
+ *
+ */
+@Explain(displayName = "Correlation Composite Operator")
+public class CorrelationCompositeDesc extends AbstractOperatorDesc {
+
+  private static final long serialVersionUID = 1L;
+
+  private ReduceSinkOperator correspondingReduceSinkOperator;
+
+  public CorrelationCompositeDesc() {
+
+  }
+
+  public CorrelationCompositeDesc(ReduceSinkOperator correspondingReduceSinkOperator) {
+    this.correspondingReduceSinkOperator = correspondingReduceSinkOperator;
+  }
+
+  public void setCorrespondingReduceSinkOperator(
+      ReduceSinkOperator correspondingReduceSinkOperator) {
+    this.correspondingReduceSinkOperator = correspondingReduceSinkOperator;
+  }
+
+  public ReduceSinkOperator getCorrespondingReduceSinkOperator() {
+    return correspondingReduceSinkOperator;
+  }
+
+  private int[] allOperationPathTags;
+
+  public void setAllOperationPathTags(int[] allOperationPathTags) {
+    this.allOperationPathTags = allOperationPathTags;
+  }
+
+  public int[] getAllOperationPathTags() {
+    return allOperationPathTags;
+  }
+
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationLocalSimulativeReduceSinkDesc.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+/**
+ * CorrelationLocalSimulativeReduceSinkDesc.
+ *
+ */
+@Explain(displayName = "Correlation Local Simulative Reduce Output Operator")
+public class CorrelationLocalSimulativeReduceSinkDesc extends BaseReduceSinkDesc {
+  private static final long serialVersionUID = 1L;
+
+  public CorrelationLocalSimulativeReduceSinkDesc() {
+  }
+
+  // A CorrelationLocalSimulativeReduceSinkDesc is only generated from a corresponding
+  // ReduceSinkDesc.
+  public CorrelationLocalSimulativeReduceSinkDesc(ReduceSinkDesc reduceSinkDesc){
+    this.keyCols = reduceSinkDesc.getKeyCols();
+    this.numDistributionKeys = reduceSinkDesc.getNumDistributionKeys();
+    this.valueCols = reduceSinkDesc.getValueCols();
+    this.outputKeyColumnNames = reduceSinkDesc.getOutputKeyColumnNames();
+    this.outputValueColumnNames = reduceSinkDesc.getOutputValueColumnNames();
+    this.tag = reduceSinkDesc.getTag();
+    this.numReducers = reduceSinkDesc.getNumReducers();
+    this.partitionCols = reduceSinkDesc.getPartitionCols();
+    this.keySerializeInfo = reduceSinkDesc.getKeySerializeInfo();
+    this.valueSerializeInfo = reduceSinkDesc.getValueSerializeInfo();
+    this.distinctColumnIndices = reduceSinkDesc.getDistinctColumnIndices();
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CorrelationReducerDispatchDesc.java Sun Sep 30 20:41:01 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+/**
+ * Correlation dispatch operator Descriptor implementation.
+ *
+ */
+@Explain(displayName = "Correlation Dispatch Operator")
+public class CorrelationReducerDispatchDesc extends AbstractOperatorDesc {
+
+  private static final long serialVersionUID = 1L;
+
+  private Map<Integer, Map<Integer, List<Integer>>> dispatchConf;
+  private Map<Integer, Map<Integer, List<SelectDesc>>> dispatchValueSelectDescConf;
+  private Map<Integer, Map<Integer, List<SelectDesc>>> dispatchKeySelectDescConf;
+
+  public CorrelationReducerDispatchDesc(){
+    this.dispatchConf = new HashMap<Integer, Map<Integer, List<Integer>>>();
+    this.dispatchValueSelectDescConf = new HashMap<Integer, Map<Integer, List<SelectDesc>>>();
+    this.dispatchKeySelectDescConf = new HashMap<Integer, Map<Integer, List<SelectDesc>>>();
+
+  }
+
+  public CorrelationReducerDispatchDesc(Map<Integer, Map<Integer, List<Integer>>> dispatchConf){
+    this.dispatchConf = dispatchConf;
+    this.dispatchValueSelectDescConf = new HashMap<Integer, Map<Integer,List<SelectDesc>>>();
+    this.dispatchKeySelectDescConf = new HashMap<Integer, Map<Integer,List<SelectDesc>>>();
+    for(Entry<Integer, Map<Integer, List<Integer>>> entry: this.dispatchConf.entrySet()){
+      HashMap<Integer, List<SelectDesc>> tmp = new HashMap<Integer, List<SelectDesc>>();
+      for(Integer child: entry.getValue().keySet()){
+        tmp.put(child, new ArrayList<SelectDesc>());
+        tmp.get(child).add(new SelectDesc(true));
+      }
+      this.dispatchValueSelectDescConf.put(entry.getKey(), tmp);
+      this.dispatchKeySelectDescConf.put(entry.getKey(), tmp);
+    }
+  }
+
+  public CorrelationReducerDispatchDesc(Map<Integer, Map<Integer, List<Integer>>> dispatchConf,
+      Map<Integer, Map<Integer, List<SelectDesc>>> dispatchKeySelectDescConf,
+      Map<Integer, Map<Integer, List<SelectDesc>>> dispatchValueSelectDescConf){
+    this.dispatchConf = dispatchConf;
+    this.dispatchValueSelectDescConf = dispatchValueSelectDescConf;
+    this.dispatchKeySelectDescConf = dispatchKeySelectDescConf;
+  }
+
+  public void setDispatchConf(Map<Integer, Map<Integer, List<Integer>>> dispatchConf){
+    this.dispatchConf = dispatchConf;
+  }
+
+  public Map<Integer, Map<Integer, List<Integer>>> getDispatchConf(){
+    return this.dispatchConf;
+  }
+
+  public void setDispatchValueSelectDescConf(Map<Integer, Map<Integer,List<SelectDesc>>> dispatchValueSelectDescConf){
+    this.dispatchValueSelectDescConf = dispatchValueSelectDescConf;
+  }
+
+  public Map<Integer, Map<Integer,List<SelectDesc>>> getDispatchValueSelectDescConf(){
+    return this.dispatchValueSelectDescConf;
+  }
+
+  public void setDispatchKeySelectDescConf(Map<Integer, Map<Integer,List<SelectDesc>>> dispatchKeySelectDescConf){
+    this.dispatchKeySelectDescConf = dispatchKeySelectDescConf;
+  }
+
+  public Map<Integer, Map<Integer, List<SelectDesc>>> getDispatchKeySelectDescConf() {
+    return this.dispatchKeySelectDescConf;
+  }
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Sun Sep 30 20:41:01 2012
@@ -71,6 +71,7 @@ public class MapredWork extends Abstract
   private Long minSplitSizePerRack;
 
   private boolean needsTagging;
+  private boolean needsOperationPathTagging;
   private boolean hadoopSupportsSplittable;
 
   private MapredLocalWork mapLocalWork;
@@ -339,6 +340,16 @@ public class MapredWork extends Abstract
     this.needsTagging = needsTagging;
   }
 
+  //TODO: enable the annotation shown below
+  // @Explain(displayName = "Needs Operation Paths Tagging", normalExplain = false)
+  public boolean getNeedsOperationPathTagging() {
+    return needsOperationPathTagging;
+  }
+
+  public void setNeedsOperationPathTagging(boolean needsOperationPathTagging) {
+    this.needsOperationPathTagging = needsOperationPathTagging;
+  }
+
   public boolean getHadoopSupportsSplittable() {
     return hadoopSupportsSplittable;
   }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Sun Sep 30 20:41:01 2012
@@ -27,58 +27,44 @@ import java.util.List;
  *
  */
 @Explain(displayName = "Reduce Output Operator")
-public class ReduceSinkDesc extends AbstractOperatorDesc {
+public class ReduceSinkDesc extends BaseReduceSinkDesc {
   private static final long serialVersionUID = 1L;
-  /**
-   * Key columns are passed to reducer in the "key".
-   */
-  private java.util.ArrayList<ExprNodeDesc> keyCols;
-  private java.util.ArrayList<java.lang.String> outputKeyColumnNames;
-  private List<List<Integer>> distinctColumnIndices;
-  /**
-   * Value columns are passed to reducer in the "value".
-   */
-  private java.util.ArrayList<ExprNodeDesc> valueCols;
-  private java.util.ArrayList<java.lang.String> outputValueColumnNames;
-  /**
-   * Describe how to serialize the key.
-   */
-  private TableDesc keySerializeInfo;
-  /**
-   * Describe how to serialize the value.
-   */
-  private TableDesc valueSerializeInfo;
-
-  /**
-   * The tag for this reducesink descriptor.
-   */
-  private int tag;
-
-  /**
-   * Number of distribution keys.
-   */
-  private int numDistributionKeys;
-
-  /**
-   * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
-   * Partition columns decide the reducer that the current row goes to.
-   * Partition columns are not passed to reducer.
-   */
-  private java.util.ArrayList<ExprNodeDesc> partitionCols;
 
-  private int numReducers;
+  private boolean needsOperationPathTagging;
+
+  public boolean getNeedsOperationPathTagging() {
+    return needsOperationPathTagging;
+  }
+
+  public void setNeedsOperationPathTagging(boolean isOperationPathTagged) {
+    this.needsOperationPathTagging = isOperationPathTagged;
+  }
 
   public ReduceSinkDesc() {
   }
 
-  public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
+  public ReduceSinkDesc(ArrayList<ExprNodeDesc> keyCols,
+	      int numDistributionKeys,
+	      ArrayList<ExprNodeDesc> valueCols,
+	      ArrayList<String> outputKeyColumnNames,
+	      List<List<Integer>> distinctColumnIndices,
+	      ArrayList<String> outputValueColumnNames, int tag,
+	      ArrayList<ExprNodeDesc> partitionCols, int numReducers,
+	      final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
+    this(keyCols, numDistributionKeys, valueCols,
+      outputKeyColumnNames, distinctColumnIndices, outputValueColumnNames, tag,
+      partitionCols, numReducers, keySerializeInfo, valueSerializeInfo, false);
+  }
+
+  public ReduceSinkDesc(ArrayList<ExprNodeDesc> keyCols,
       int numDistributionKeys,
-      java.util.ArrayList<ExprNodeDesc> valueCols,
-      java.util.ArrayList<java.lang.String> outputKeyColumnNames,
+      ArrayList<ExprNodeDesc> valueCols,
+      ArrayList<String> outputKeyColumnNames,
       List<List<Integer>> distinctColumnIndices,
-      java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
-      java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
-      final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
+      ArrayList<String> outputValueColumnNames, int tag,
+      ArrayList<ExprNodeDesc> partitionCols, int numReducers,
+      final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo,
+      boolean needsOperationPathTagging) {
     this.keyCols = keyCols;
     this.numDistributionKeys = numDistributionKeys;
     this.valueCols = valueCols;
@@ -90,6 +76,7 @@ public class ReduceSinkDesc extends Abst
     this.keySerializeInfo = keySerializeInfo;
     this.valueSerializeInfo = valueSerializeInfo;
     this.distinctColumnIndices = distinctColumnIndices;
+    this.needsOperationPathTagging = needsOperationPathTagging;
   }
 
   @Override
@@ -112,127 +99,7 @@ public class ReduceSinkDesc extends Abst
     desc.setPartitionCols((ArrayList<ExprNodeDesc>) getPartitionCols().clone());
     desc.setKeySerializeInfo((TableDesc) getKeySerializeInfo().clone());
     desc.setValueSerializeInfo((TableDesc) getValueSerializeInfo().clone());
+    desc.setNeedsOperationPathTagging(needsOperationPathTagging);
     return desc;
   }
-
-  public java.util.ArrayList<java.lang.String> getOutputKeyColumnNames() {
-    return outputKeyColumnNames;
-  }
-
-  public void setOutputKeyColumnNames(
-      java.util.ArrayList<java.lang.String> outputKeyColumnNames) {
-    this.outputKeyColumnNames = outputKeyColumnNames;
-  }
-
-  public java.util.ArrayList<java.lang.String> getOutputValueColumnNames() {
-    return outputValueColumnNames;
-  }
-
-  public void setOutputValueColumnNames(
-      java.util.ArrayList<java.lang.String> outputValueColumnNames) {
-    this.outputValueColumnNames = outputValueColumnNames;
-  }
-
-  @Explain(displayName = "key expressions")
-  public java.util.ArrayList<ExprNodeDesc> getKeyCols() {
-    return keyCols;
-  }
-
-  public void setKeyCols(final java.util.ArrayList<ExprNodeDesc> keyCols) {
-    this.keyCols = keyCols;
-  }
-
-  public int getNumDistributionKeys() {
-    return this.numDistributionKeys;
-  }
-
-  public void setNumDistributionKeys(int numKeys) {
-    this.numDistributionKeys = numKeys;
-  }
-
-  @Explain(displayName = "value expressions")
-  public java.util.ArrayList<ExprNodeDesc> getValueCols() {
-    return valueCols;
-  }
-
-  public void setValueCols(final java.util.ArrayList<ExprNodeDesc> valueCols) {
-    this.valueCols = valueCols;
-  }
-
-  @Explain(displayName = "Map-reduce partition columns")
-  public java.util.ArrayList<ExprNodeDesc> getPartitionCols() {
-    return partitionCols;
-  }
-
-  public void setPartitionCols(
-      final java.util.ArrayList<ExprNodeDesc> partitionCols) {
-    this.partitionCols = partitionCols;
-  }
-
-  @Explain(displayName = "tag")
-  public int getTag() {
-    return tag;
-  }
-
-  public void setTag(int tag) {
-    this.tag = tag;
-  }
-
-  /**
-   * Returns the number of reducers for the map-reduce job. -1 means to decide
-   * the number of reducers at runtime. This enables Hive to estimate the number
-   * of reducers based on the map-reduce input data size, which is only
-   * available right before we start the map-reduce job.
-   */
-  public int getNumReducers() {
-    return numReducers;
-  }
-
-  public void setNumReducers(int numReducers) {
-    this.numReducers = numReducers;
-  }
-
-  public TableDesc getKeySerializeInfo() {
-    return keySerializeInfo;
-  }
-
-  public void setKeySerializeInfo(TableDesc keySerializeInfo) {
-    this.keySerializeInfo = keySerializeInfo;
-  }
-
-  public TableDesc getValueSerializeInfo() {
-    return valueSerializeInfo;
-  }
-
-  public void setValueSerializeInfo(TableDesc valueSerializeInfo) {
-    this.valueSerializeInfo = valueSerializeInfo;
-  }
-
-  /**
-   * Returns the sort order of the key columns.
-   *
-   * @return null, which means ascending order for all key columns, or a String
-   *         of the same length as key columns, that consists of only "+"
-   *         (ascending order) and "-" (descending order).
-   */
-  @Explain(displayName = "sort order")
-  public String getOrder() {
-    return keySerializeInfo.getProperties().getProperty(
-        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER);
-  }
-
-  public void setOrder(String orderStr) {
-    keySerializeInfo.getProperties().setProperty(
-        org.apache.hadoop.hive.serde.Constants.SERIALIZATION_SORT_ORDER,
-        orderStr);
-  }
-
-  public List<List<Integer>> getDistinctColumnIndices() {
-    return distinctColumnIndices;
-  }
-
-  public void setDistinctColumnIndices(
-      List<List<Integer>> distinctColumnIndices) {
-    this.distinctColumnIndices = distinctColumnIndices;
-  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableScanDesc.java Sun Sep 30 20:41:01 2012
@@ -50,6 +50,8 @@ public class TableScanDesc extends Abstr
   private boolean gatherStats;
   private boolean statsReliable;
 
+  private boolean forwardRowNumber = false;
+
   private ExprNodeDesc filterExpr;
 
   public static final String FILTER_EXPR_CONF_STR =
@@ -103,6 +105,14 @@ public class TableScanDesc extends Abstr
     return partColumns;
   }
 
+  public boolean isForwardRowNumber() {
+    return forwardRowNumber;
+  }
+
+  public void setForwardRowNumber(boolean forwardRowNumber) {
+    this.forwardRowNumber = forwardRowNumber;
+  }
+
   public void setGatherStats(boolean gatherStats) {
     this.gatherStats = gatherStats;
   }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java?rev=1392105&r1=1392104&r2=1392105&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java Sun Sep 30 20:41:01 2012
@@ -277,6 +277,7 @@ public class TestExecDriver extends Test
   private void populateMapRedPlan3(Table src, Table src2) throws SemanticException {
     mr.setNumReduceTasks(Integer.valueOf(5));
     mr.setNeedsTagging(true);
+    mr.setNeedsOperationPathTagging(false);
     ArrayList<String> outputColumns = new ArrayList<String>();
     for (int i = 0; i < 2; i++) {
       outputColumns.add("_col" + i);

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q Sun Sep 30 20:41:01 2012
@@ -0,0 +1,24 @@
+-- the query is from auto_join26.q
+
+CREATE TABLE dest_co1(key INT, cnt INT);
+CREATE TABLE dest_co2(key INT, cnt INT);
+
+set hive.optimize.correlation=false;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;
+
+INSERT OVERWRITE TABLE dest_co1
+SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;
+
+INSERT OVERWRITE TABLE dest_co2
+SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key;
+
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key;
+SELECT * FROM dest_co2 x ORDER BY x.key;
\ No newline at end of file

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q Sun Sep 30 20:41:01 2012
@@ -0,0 +1,44 @@
+-- the query is modified from join18.q
+
+CREATE TABLE dest_co1(key1 INT, cnt1 INT, key2 INT, cnt2 INT);
+CREATE TABLE dest_co2(key1 INT, cnt1 INT, key2 INT, cnt2 INT);
+
+set hive.optimize.correlation=false;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT a.key, a.cnt, b.key, b.cnt
+FROM
+(SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+JOIN
+(SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ON (a.key = b.key);
+
+INSERT OVERWRITE TABLE dest_co1
+SELECT a.key, a.cnt, b.key, b.cnt
+FROM
+(SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+JOIN
+(SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ON (a.key = b.key);
+
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT a.key, a.cnt, b.key, b.cnt
+FROM
+(SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+JOIN
+(SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ON (a.key = b.key);
+
+INSERT OVERWRITE TABLE dest_co2
+SELECT a.key, a.cnt, b.key, b.cnt
+FROM
+(SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+JOIN
+(SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ON (a.key = b.key);
+
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key1, x.key2, x.cnt1, x.cnt2;
+SELECT * FROM dest_co2 x ORDER BY x.key1, x.key2, x.cnt1, x.cnt2;
\ No newline at end of file

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q Sun Sep 30 20:41:01 2012
@@ -0,0 +1,43 @@
+CREATE TABLE dest_co1(key INT, cnt INT, value STRING);
+CREATE TABLE dest_co2(key INT, cnt INT, value STRING);
+
+
+set hive.optimize.correlation=false;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT b.key, b.cnt, d.value
+FROM
+(SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+JOIN
+(SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ON b.key = d.key;
+
+INSERT OVERWRITE TABLE dest_co1
+SELECT b.key, b.cnt, d.value
+FROM
+(SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+JOIN
+(SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ON b.key = d.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT b.key, b.cnt, d.value
+FROM
+(SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+JOIN
+(SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ON b.key = d.key;
+
+INSERT OVERWRITE TABLE dest_co2
+SELECT b.key, b.cnt, d.value
+FROM
+(SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+JOIN
+(SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ON b.key = d.key;
+
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt, x.value;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt, x.value;
\ No newline at end of file

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q Sun Sep 30 20:41:01 2012
@@ -0,0 +1,112 @@
+CREATE TABLE T1(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+CREATE TABLE T2(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+CREATE TABLE dest_co1(key INT, cnt INT);
+CREATE TABLE dest_co2(key INT, cnt INT);
+
+set hive.optimize.correlation=false;
+-- INNER JOIN should be optimized
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;
+
+set hive.optimize.correlation=false;
+-- The case that GROUP BY key is from the left table of LEFT OUTER JOIN should be optimized
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;
+
+
+set hive.optimize.correlation=false;
+-- The case that GROUP BY key is from the right table of RIGHT OUTER JOIN should be optimized
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;
+
+
+set hive.optimize.correlation=false;
+-- The case that GROUP BY key is from the right table of LEFT OUTER JOIN should not be optimized
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY y.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;
+
+
+set hive.optimize.correlation=false;
+-- The case that GROUP BY key is from the left table of RIGHT OUTER JOIN should not be optimized
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;
+
+set hive.optimize.correlation=false;
+-- FULL OUTER JOIN will not be optimized
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) GROUP BY x.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q Sun Sep 30 20:41:01 2012
@@ -0,0 +1,74 @@
+CREATE TABLE T1(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+CREATE TABLE T2(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+CREATE TABLE T3(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T3;
+
+CREATE TABLE dest_co1(key INT, cnt INT);
+CREATE TABLE dest_co2(key INT, cnt INT);
+
+set hive.optimize.correlation=false;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key) GROUP BY y.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key) GROUP BY y.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;
+
+set hive.optimize.correlation=false;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key) GROUP BY x.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key) GROUP BY x.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key) GROUP BY x.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key) GROUP BY x.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;
+
+set hive.optimize.correlation=false;
+-- FULL OUTER JOIN will not be optimized
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT z.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key) GROUP BY z.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT z.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key) GROUP BY z.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT z.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key) GROUP BY z.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT z.key, count(1) FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key) GROUP BY z.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;
+
+set hive.optimize.correlation=false;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co1
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key) GROUP BY y.key;
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key) GROUP BY y.key;
+INSERT OVERWRITE TABLE dest_co2
+SELECT y.key, count(1) FROM T2 x JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key) GROUP BY y.key;
+-- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key, x.cnt;
+SELECT * FROM dest_co2 x ORDER BY x.key, x.cnt;

Added: hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer1.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer1.q.out?rev=1392105&view=auto
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer1.q.out (added)
+++ hive/trunk/ql/src/test/results/clientpositive/correlationoptimizer1.q.out Sun Sep 30 20:41:01 2012
@@ -0,0 +1,377 @@
+PREHOOK: query: -- the query is from auto_join26.q
+
+CREATE TABLE dest_co1(key INT, cnt INT)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: -- the query is from auto_join26.q
+
+CREATE TABLE dest_co1(key INT, cnt INT)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@dest_co1
+PREHOOK: query: CREATE TABLE dest_co2(key INT, cnt INT)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: CREATE TABLE dest_co2(key INT, cnt INT)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@dest_co2
+PREHOOK: query: EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key
+POSTHOOK: type: QUERY
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src1) x) (TOK_TABREF (TOK_TABNAME src) y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME dest_co1))) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL x) key)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL x) key))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        x 
+          TableScan
+            alias: x
+            Reduce Output Operator
+              key expressions:
+                    expr: key
+                    type: string
+              sort order: +
+              Map-reduce partition columns:
+                    expr: key
+                    type: string
+              tag: 0
+              value expressions:
+                    expr: key
+                    type: string
+        y 
+          TableScan
+            alias: y
+            Reduce Output Operator
+              key expressions:
+                    expr: key
+                    type: string
+              sort order: +
+              Map-reduce partition columns:
+                    expr: key
+                    type: string
+              tag: 1
+      Reduce Operator Tree:
+        Join Operator
+          condition map:
+               Inner Join 0 to 1
+          condition expressions:
+            0 {VALUE._col0}
+            1 
+          handleSkewJoin: false
+          outputColumnNames: _col0
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: string
+            outputColumnNames: _col0
+            Group By Operator
+              aggregations:
+                    expr: count(1)
+              bucketGroup: false
+              keys:
+                    expr: _col0
+                    type: string
+              mode: hash
+              outputColumnNames: _col0, _col1
+              File Output Operator
+                compressed: false
+                GlobalTableId: 0
+                table:
+                    input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+
+  Stage: Stage-2
+    Map Reduce
+      Alias -> Map Operator Tree:
+#### A masked pattern was here ####
+            Reduce Output Operator
+              key expressions:
+                    expr: _col0
+                    type: string
+              sort order: +
+              Map-reduce partition columns:
+                    expr: _col0
+                    type: string
+              tag: -1
+              value expressions:
+                    expr: _col1
+                    type: bigint
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations:
+                expr: count(VALUE._col0)
+          bucketGroup: false
+          keys:
+                expr: KEY._col0
+                type: string
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Select Operator
+            expressions:
+                  expr: _col0
+                  type: string
+                  expr: _col1
+                  type: bigint
+            outputColumnNames: _col0, _col1
+            Select Operator
+              expressions:
+                    expr: UDFToInteger(_col0)
+                    type: int
+                    expr: UDFToInteger(_col1)
+                    type: int
+              outputColumnNames: _col0, _col1
+              File Output Operator
+                compressed: false
+                GlobalTableId: 1
+                table:
+                    input format: org.apache.hadoop.mapred.TextInputFormat
+                    output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                    serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                    name: default.dest_co1
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.dest_co1
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+
+PREHOOK: query: INSERT OVERWRITE TABLE dest_co1
+SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Input: default@src1
+PREHOOK: Output: default@dest_co1
+POSTHOOK: query: INSERT OVERWRITE TABLE dest_co1
+SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Input: default@src1
+POSTHOOK: Output: default@dest_co1
+POSTHOOK: Lineage: dest_co1.cnt EXPRESSION [(src1)x.null, (src)y.null, ]
+POSTHOOK: Lineage: dest_co1.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key
+PREHOOK: type: QUERY
+POSTHOOK: query: EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key
+POSTHOOK: type: QUERY
+POSTHOOK: Lineage: dest_co1.cnt EXPRESSION [(src1)x.null, (src)y.null, ]
+POSTHOOK: Lineage: dest_co1.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ]
+ABSTRACT SYNTAX TREE:
+  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME src1) x) (TOK_TABREF (TOK_TABNAME src) y) (= (. (TOK_TABLE_OR_COL x) key) (. (TOK_TABLE_OR_COL y) key)))) (TOK_INSERT (TOK_DESTINATION (TOK_TAB (TOK_TABNAME dest_co2))) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL x) key)) (TOK_SELEXPR (TOK_FUNCTION count 1))) (TOK_GROUPBY (. (TOK_TABLE_OR_COL x) key))))
+
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-0 depends on stages: Stage-1
+  Stage-2 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Map Reduce
+      Alias -> Map Operator Tree:
+        x 
+          TableScan
+            alias: x
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+              outputColumnNames: key
+              Forward
+                Correlation Composite Operator
+                  Reduce Output Operator
+                    key expressions:
+                          expr: key
+                          type: string
+                    sort order: +
+                    Map-reduce partition columns:
+                          expr: key
+                          type: string
+                    tag: 0
+                    value expressions:
+                          expr: key
+                          type: string
+        y 
+          TableScan
+            alias: y
+            Select Operator
+              expressions:
+                    expr: key
+                    type: string
+              outputColumnNames: key
+              Forward
+                Correlation Composite Operator
+                  Reduce Output Operator
+                    key expressions:
+                          expr: key
+                          type: string
+                    sort order: +
+                    Map-reduce partition columns:
+                          expr: key
+                          type: string
+                    tag: 1
+      Reduce Operator Tree:
+        Correlation Dispatch Operator
+          Join Operator
+            condition map:
+                 Inner Join 0 to 1
+            condition expressions:
+              0 {VALUE._col0}
+              1 
+            handleSkewJoin: false
+            outputColumnNames: _col0
+            Select Operator
+              expressions:
+                    expr: _col0
+                    type: string
+              outputColumnNames: _col0
+              Correlation Local Simulative Reduce Output Operator
+                key expressions:
+                      expr: _col0
+                      type: string
+                sort order: +
+                Map-reduce partition columns:
+                      expr: _col0
+                      type: string
+                tag: -1
+                value expressions:
+                      expr: 1
+                      type: int
+                Group By Operator
+                  aggregations:
+                        expr: count(VALUE._col0)
+                  bucketGroup: false
+                  keys:
+                        expr: KEY._col0
+                        type: string
+                  mode: complete
+                  outputColumnNames: _col0, _col1
+                  Select Operator
+                    expressions:
+                          expr: _col0
+                          type: string
+                          expr: _col1
+                          type: bigint
+                    outputColumnNames: _col0, _col1
+                    Select Operator
+                      expressions:
+                            expr: UDFToInteger(_col0)
+                            type: int
+                            expr: UDFToInteger(_col1)
+                            type: int
+                      outputColumnNames: _col0, _col1
+                      File Output Operator
+                        compressed: false
+                        GlobalTableId: 1
+                        table:
+                            input format: org.apache.hadoop.mapred.TextInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                            name: default.dest_co2
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.dest_co2
+
+  Stage: Stage-2
+    Stats-Aggr Operator
+
+
+PREHOOK: query: INSERT OVERWRITE TABLE dest_co2
+SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Input: default@src1
+PREHOOK: Output: default@dest_co2
+POSTHOOK: query: INSERT OVERWRITE TABLE dest_co2
+SELECT  x.key, count(1) FROM src1 x JOIN src y ON (x.key = y.key) group by x.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Input: default@src1
+POSTHOOK: Output: default@dest_co2
+POSTHOOK: Lineage: dest_co1.cnt EXPRESSION [(src1)x.null, (src)y.null, ]
+POSTHOOK: Lineage: dest_co1.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest_co2.cnt EXPRESSION [(src1)x.null, (src)y.null, ]
+POSTHOOK: Lineage: dest_co2.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: -- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest_co1
+#### A masked pattern was here ####
+POSTHOOK: query: -- dest_co1 and dest_co2 should be same
+SELECT * FROM dest_co1 x ORDER BY x.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest_co1
+#### A masked pattern was here ####
+POSTHOOK: Lineage: dest_co1.cnt EXPRESSION [(src1)x.null, (src)y.null, ]
+POSTHOOK: Lineage: dest_co1.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest_co2.cnt EXPRESSION [(src1)x.null, (src)y.null, ]
+POSTHOOK: Lineage: dest_co2.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ]
+66	1
+98	2
+128	3
+146	2
+150	1
+213	2
+224	2
+238	2
+255	2
+273	3
+278	2
+311	3
+369	3
+401	5
+406	4
+PREHOOK: query: SELECT * FROM dest_co2 x ORDER BY x.key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@dest_co2
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM dest_co2 x ORDER BY x.key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@dest_co2
+#### A masked pattern was here ####
+POSTHOOK: Lineage: dest_co1.cnt EXPRESSION [(src1)x.null, (src)y.null, ]
+POSTHOOK: Lineage: dest_co1.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: dest_co2.cnt EXPRESSION [(src1)x.null, (src)y.null, ]
+POSTHOOK: Lineage: dest_co2.key EXPRESSION [(src1)x.FieldSchema(name:key, type:string, comment:default), ]
+66	1
+98	2
+128	3
+146	2
+150	1
+213	2
+224	2
+238	2
+255	2
+273	3
+278	2
+311	3
+369	3
+401	5
+406	4