You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/18 11:16:53 UTC

svn commit: r1504395 [3/15] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/if/ ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql...

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1504395&r1=1504394&r2=1504395&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Thu Jul 18 09:16:52 2013
@@ -416,6 +416,22 @@ public class CommonJoinTaskDispatcher ex
     copyReducerConf(mapJoinTask, childTask);
   }
 
+  public static boolean cannotConvert(String bigTableAlias,
+      Map<String, Long> aliasToSize, long aliasTotalKnownInputSize,
+      long ThresholdOfSmallTblSizeSum) {
+    boolean ret = false;
+    Long aliasKnownSize = aliasToSize.get(bigTableAlias);
+    if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
+      long smallTblTotalKnownSize = aliasTotalKnownInputSize
+          - aliasKnownSize.longValue();
+      if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
+        //this table is not good to be a big table.
+        ret = true;
+      }
+    }
+    return ret;
+  }
+
   @Override
   public Task<? extends Serializable> processCurrentTask(MapRedTask currTask,
       ConditionalTask conditionalTask, Context context)
@@ -564,14 +580,9 @@ public class CommonJoinTaskDispatcher ex
         MapRedTask newTask = newTaskAlias.getFirst();
         bigTableAlias = newTaskAlias.getSecond();
 
-        Long aliasKnownSize = aliasToSize.get(bigTableAlias);
-        if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
-          long smallTblTotalKnownSize = aliasTotalKnownInputSize
-              - aliasKnownSize.longValue();
-          if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
-            // this table is not good to be a big table.
-            continue;
-          }
+        if (cannotConvert(bigTableAlias, aliasToSize,
+            aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) {
+          continue;
         }
 
         // add into conditional task

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1504395&r1=1504394&r2=1504395&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Jul 18 09:16:52 2013
@@ -3499,7 +3499,8 @@ public class SemanticAnalyzer extends Ba
     }
 
     List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
-        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
+        colExprMap);
 
     ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
     HashMap<String, ASTNode> aggregationTrees = parseInfo
@@ -3572,7 +3573,8 @@ public class SemanticAnalyzer extends Ba
   private List<List<Integer>> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo,
       String dest,
       List<ExprNodeDesc> reduceKeys, RowResolver reduceSinkInputRowResolver,
-      RowResolver reduceSinkOutputRowResolver, List<String> outputKeyColumnNames)
+      RowResolver reduceSinkOutputRowResolver, List<String> outputKeyColumnNames,
+      Map<String, ExprNodeDesc> colExprMap)
       throws SemanticException {
 
     List<List<Integer>> distinctColIndices = new ArrayList<List<Integer>>();
@@ -3611,6 +3613,7 @@ public class SemanticAnalyzer extends Ba
           ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false);
           reduceSinkOutputRowResolver.putExpression(parameter, colInfo);
           numExprs++;
+          colExprMap.put(colInfo.getInternalName(), expr);
         }
         distinctColIndices.add(distinctIndices);
       }
@@ -3672,7 +3675,8 @@ public class SemanticAnalyzer extends Ba
         colExprMap);
 
     List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
-        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+        reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
+        colExprMap);
 
     ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
 
@@ -6952,6 +6956,7 @@ public class SemanticAnalyzer extends Ba
               reduceValues.size() - 1).getTypeInfo(), "", false);
           reduceSinkOutputRowResolver.putExpression(grpbyExpr, colInfo);
           outputColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
+          colExprMap.put(colInfo.getInternalName(), grpByExprNode);
         }
       }
 

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DemuxDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DemuxDesc.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DemuxDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DemuxDesc.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Demux operator descriptor implementation.
+ *
+ */
+@Explain(displayName = "Demux Operator")
+public class DemuxDesc extends AbstractOperatorDesc {
+
+  private static final long serialVersionUID = 1L;
+
+  private Map<Integer, Integer> newTagToOldTag;
+  private Map<Integer, Integer> newTagToChildIndex;
+  private List<TableDesc> keysSerializeInfos;
+  private List<TableDesc> valuesSerializeInfos;
+  private Map<Integer, Integer> childIndexToOriginalNumParents;
+
+  public DemuxDesc() {
+  }
+
+  public DemuxDesc(
+      Map<Integer, Integer> newTagToOldTag,
+      Map<Integer, Integer> newTagToChildIndex,
+      Map<Integer, Integer> childIndexToOriginalNumParents,
+      List<TableDesc> keysSerializeInfos,
+      List<TableDesc> valuesSerializeInfos){
+    this.newTagToOldTag = newTagToOldTag;
+    this.newTagToChildIndex = newTagToChildIndex;
+    this.childIndexToOriginalNumParents = childIndexToOriginalNumParents;
+    this.keysSerializeInfos = keysSerializeInfos;
+    this.valuesSerializeInfos = valuesSerializeInfos;
+  }
+
+  public List<TableDesc> getKeysSerializeInfos() {
+    return keysSerializeInfos;
+  }
+
+  public void setKeysSerializeInfos(List<TableDesc> keysSerializeInfos) {
+    this.keysSerializeInfos = keysSerializeInfos;
+  }
+
+  public List<TableDesc> getValuesSerializeInfos() {
+    return valuesSerializeInfos;
+  }
+
+  public void setValuesSerializeInfos(List<TableDesc> valuesSerializeInfos) {
+    this.valuesSerializeInfos = valuesSerializeInfos;
+  }
+
+  public Map<Integer, Integer> getNewTagToOldTag() {
+    return newTagToOldTag;
+  }
+
+  public void setNewTagToOldTag(Map<Integer, Integer> newTagToOldTag) {
+    this.newTagToOldTag = newTagToOldTag;
+  }
+
+  public Map<Integer, Integer> getNewTagToChildIndex() {
+    return newTagToChildIndex;
+  }
+
+  public void setNewTagToChildIndex(Map<Integer, Integer> newTagToChildIndex) {
+    this.newTagToChildIndex = newTagToChildIndex;
+  }
+
+  public Map<Integer, Integer> getChildIndexToOriginalNumParents() {
+    return childIndexToOriginalNumParents;
+  }
+
+  public void setChildIndexToOriginalNumParents(
+      Map<Integer, Integer> childIndexToOriginalNumParents) {
+    this.childIndexToOriginalNumParents = childIndexToOriginalNumParents;
+  }
+}

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MuxDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MuxDesc.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MuxDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MuxDesc.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+
+
+/**
+ * Mux operator descriptor implementation..
+ *
+ */
+@Explain(displayName = "Mux Operator")
+public class MuxDesc extends AbstractOperatorDesc {
+  private static final long serialVersionUID = 1L;
+
+  private List<List<ExprNodeDesc>> parentToKeyCols;
+  private List<List<ExprNodeDesc>> parentToValueCols;
+  private List<List<String>> parentToOutputKeyColumnNames;
+  private List<List<String>> parentToOutputValueColumnNames;
+  private List<Integer> parentToTag;
+  private Map<Integer, Integer> newParentIndexToOldParentIndex;
+
+  public MuxDesc() {
+  }
+
+  // A MuxDesc is only generated from a corresponding ReduceSinkDesc.
+  public MuxDesc(List<Operator<? extends OperatorDesc>> ops){
+    int numParents = ops.size();
+    parentToKeyCols = new ArrayList<List<ExprNodeDesc>>(numParents);
+    parentToOutputKeyColumnNames = new ArrayList<List<String>>(numParents);
+    parentToValueCols = new ArrayList<List<ExprNodeDesc>>(numParents);
+    parentToOutputValueColumnNames = new ArrayList<List<String>>(numParents);
+    parentToTag = new ArrayList<Integer>(numParents);
+
+    for (Operator<? extends OperatorDesc> op: ops) {
+      if (op != null && op instanceof ReduceSinkOperator) {
+        ReduceSinkOperator rsop = (ReduceSinkOperator)op;
+        List<ExprNodeDesc> keyCols = rsop.getConf().getKeyCols();
+        List<ExprNodeDesc> valueCols = rsop.getConf().getValueCols();
+        List<String> outputKeyColumnNames = rsop.getConf().getOutputKeyColumnNames();
+        List<String> outputValueColumnNames = rsop.getConf().getOutputValueColumnNames();
+        int tag = rsop.getConf().getTag();
+        parentToKeyCols.add(keyCols);
+        parentToValueCols.add(valueCols);
+        parentToOutputKeyColumnNames.add(outputKeyColumnNames);
+        parentToOutputValueColumnNames.add(outputValueColumnNames);
+        parentToTag.add(tag);
+      } else {
+        parentToKeyCols.add(null);
+        parentToValueCols.add(null);
+        parentToOutputKeyColumnNames.add(null);
+        parentToOutputValueColumnNames.add(null);
+        parentToTag.add(null);
+      }
+    }
+  }
+
+  public List<List<ExprNodeDesc>> getParentToKeyCols() {
+    return parentToKeyCols;
+  }
+
+  public void setParentToKeyCols(List<List<ExprNodeDesc>> parentToKeyCols) {
+    this.parentToKeyCols = parentToKeyCols;
+  }
+
+  public List<List<ExprNodeDesc>> getParentToValueCols() {
+    return parentToValueCols;
+  }
+
+  public void setParentToValueCols(List<List<ExprNodeDesc>> parentToValueCols) {
+    this.parentToValueCols = parentToValueCols;
+  }
+
+  public List<List<String>> getParentToOutputKeyColumnNames() {
+    return parentToOutputKeyColumnNames;
+  }
+
+  public void setParentToOutputKeyColumnNames(
+      List<List<String>> parentToOutputKeyColumnNames) {
+    this.parentToOutputKeyColumnNames = parentToOutputKeyColumnNames;
+  }
+
+  public List<List<String>> getParentToOutputValueColumnNames() {
+    return parentToOutputValueColumnNames;
+  }
+
+  public void setParentToOutputValueColumnNames(
+      List<List<String>> parentToOutputValueColumnNames) {
+    this.parentToOutputValueColumnNames = parentToOutputValueColumnNames;
+  }
+
+  public List<Integer> getParentToTag() {
+    return parentToTag;
+  }
+
+  public void setParentToTag(List<Integer> parentToTag) {
+    this.parentToTag = parentToTag;
+  }
+
+  public Map<Integer, Integer> getNewParentIndexToOldParentIndex() {
+    return newParentIndexToOldParentIndex;
+  }
+
+  public void setNewParentIndexToOldParentIndex(
+      Map<Integer, Integer> newParentIndexToOldParentIndex) {
+    this.newParentIndexToOldParentIndex = newParentIndexToOldParentIndex;
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1504395&r1=1504394&r2=1504395&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Thu Jul 18 09:16:52 2013
@@ -71,13 +71,13 @@ public class ReduceSinkDesc extends Abst
   public ReduceSinkDesc() {
   }
 
-  public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
+  public ReduceSinkDesc(ArrayList<ExprNodeDesc> keyCols,
       int numDistributionKeys,
-      java.util.ArrayList<ExprNodeDesc> valueCols,
-      java.util.ArrayList<java.lang.String> outputKeyColumnNames,
+      ArrayList<ExprNodeDesc> valueCols,
+      ArrayList<String> outputKeyColumnNames,
       List<List<Integer>> distinctColumnIndices,
-      java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
-      java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
+      ArrayList<String> outputValueColumnNames, int tag,
+      ArrayList<ExprNodeDesc> partitionCols, int numReducers,
       final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
     this.keyCols = keyCols;
     this.numDistributionKeys = numDistributionKeys;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java?rev=1504395&r1=1504394&r2=1504395&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java Thu Jul 18 09:16:52 2013
@@ -26,12 +26,16 @@ package org.apache.hadoop.hive.ql.plan;
 @Explain(displayName = "Union")
 public class UnionDesc extends AbstractOperatorDesc {
   private static final long serialVersionUID = 1L;
-
   private transient int numInputs;
+  // If this UnionOperator is inside the reduce side of an MR job generated
+  // by Correlation Optimizer, which means all inputs of this UnionOperator are
+  // from DemuxOperator. If so, we should not touch this UnionOperator in genMapRedTasks.
+  private transient boolean allInputsInSameReducer;
 
   @SuppressWarnings("nls")
   public UnionDesc() {
     numInputs = 2;
+    allInputsInSameReducer = false;
   }
 
   /**
@@ -48,4 +52,12 @@ public class UnionDesc extends AbstractO
   public void setNumInputs(int numInputs) {
     this.numInputs = numInputs;
   }
+
+  public boolean isAllInputsInSameReducer() {
+    return allInputsInSameReducer;
+  }
+
+  public void setAllInputsInSameReducer(boolean allInputsInSameReducer) {
+    this.allInputsInSameReducer = allInputsInSameReducer;
+  }
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,281 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- This query has a GroupByOperator folling JoinOperator and they share the same keys.
+-- When Correlation Optimizer is turned off, three MR jobs will be generated.
+-- When Correlation Optimizer is turned on, two MR jobs will be generated
+-- and JoinOperator (on the column of key) and GroupByOperator (also on the column
+-- of key) will be executed in the first MR job.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+set hive.auto.convert.join=true;
+set hive.optimize.correlation=true;
+-- Enable hive.auto.convert.join.
+-- Correlation Optimizer will detect that the join will be converted to a Map-join,
+-- so it will not try to optimize this query.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+      
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+set hive.optimize.correlation=true;
+-- Enable hive.auto.convert.join.
+-- Correlation Optimizer will detect that the join will be converted to a Map-join,
+-- so it will not try to optimize this query.
+-- We should generate 1 MR job for subquery tmp.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the left table's key in
+-- a Left Semi Join, these two operators will be executed in
+-- the same MR job when Correlation Optimizer is enabled.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x LEFT SEMI JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x LEFT SEMI JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x LEFT SEMI JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x LEFT SEMI JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+      
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the left table's key in
+-- a Left Outer Join, these two operators will be executed in
+-- the same MR job when Correlation Optimizer is enabled.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+      
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the right table's key in
+-- a Left Outer Join, we cannot use a single MR to execute these two 
+-- operators because those keys with a null value are not grouped.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the right table's key in
+-- a Right Outer Join, these two operators will be executed in
+-- the same MR job when Correlation Optimizer is enabled.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY y.key) tmp;
+
+
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the left table's key in
+-- a Right Outer Join, we cannot use a single MR to execute these two 
+-- operators because those keys with a null value are not grouped.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=false;
+-- This query has a Full Outer Join followed by a GroupByOperator and
+-- they share the same key. Because those keys with a null value are not grouped
+-- in the output of the Full Outer Join, we cannot use a single MR to execute
+-- these two operators.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x FULL OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x FULL OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x FULL OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x FULL OUTER JOIN src y ON (x.key = y.key)
+      GROUP BY x.key) tmp;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- Currently, we only handle exactly same keys, this query will not be optimized
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.value)), SUM(HASH(tmp.cnt)) 
+FROM (SELECT x.key AS key, x.value AS value, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key, x.value) tmp;
+      
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.value)), SUM(HASH(tmp.cnt)) 
+FROM (SELECT x.key AS key, x.value AS value, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key, x.value) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.value)), SUM(HASH(tmp.cnt)) 
+FROM (SELECT x.key AS key, x.value AS value, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key, x.value) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.value)), SUM(HASH(tmp.cnt)) 
+FROM (SELECT x.key AS key, x.value AS value, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key)
+      GROUP BY x.key, x.value) tmp;
+
+set hive.optimize.correlation=false;
+-- Currently, we only handle exactly same keys, this query will not be optimized
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key AND x.value = y.value)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key AND x.value = y.value)
+      GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key AND x.value = y.value)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src1 x JOIN src y ON (x.key = y.key AND x.value = y.value)
+      GROUP BY x.key) tmp;

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer10.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer10.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer10.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer10.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,130 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx and xx join yy.
+-- This case is used to test LEFT SEMI JOIN since Hive will
+-- introduce a GroupByOperator before the ReduceSinkOperator of
+-- the right table (yy in queries below)
+-- of LEFT SEMI JOIN.
+EXPLAIN
+SELECT xx.key, xx.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+LEFT SEMI JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt;
+
+SELECT xx.key, xx.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+LEFT SEMI JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+LEFT SEMI JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt;
+
+SELECT xx.key, xx.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+LEFT SEMI JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx and xx join yy.
+-- This case is used to test LEFT SEMI JOIN since Hive will
+-- introduce a GroupByOperator before the ReduceSinkOperator of
+-- the right table (yy in queries below)
+-- of LEFT SEMI JOIN.
+EXPLAIN
+SELECT xx.key, xx.value
+FROM
+src1 xx
+LEFT SEMI JOIN 
+(SELECT x.key as key 
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND
+       y.key > 20) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+SELECT xx.key, xx.value
+FROM
+src1 xx
+LEFT SEMI JOIN 
+(SELECT x.key as key 
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND
+       y.key > 20) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value
+FROM
+src1 xx
+LEFT SEMI JOIN 
+(SELECT x.key as key 
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND
+       y.key > 20) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+SELECT xx.key, xx.value
+FROM
+src1 xx
+LEFT SEMI JOIN 
+(SELECT x.key as key 
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND
+       y.key > 20) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- This test is used to test if we can use shared scan for 
+-- xx, yy:x, and yy:y.
+EXPLAIN
+SELECT xx.key, xx.value
+FROM
+src xx
+LEFT SEMI JOIN 
+(SELECT x.key as key 
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND x.key > 180) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+SELECT xx.key, xx.value
+FROM
+src xx
+LEFT SEMI JOIN 
+(SELECT x.key as key 
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND x.key > 180) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value
+FROM
+src xx
+LEFT SEMI JOIN 
+(SELECT x.key as key 
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND x.key > 180) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+SELECT xx.key, xx.value
+FROM
+src xx
+LEFT SEMI JOIN 
+(SELECT x.key as key 
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND x.key > 180) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer11.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer11.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer11.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer11.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,73 @@
+set hive.auto.convert.join=false;
+-- Tests in this file are used to make sure Correlation Optimizer
+-- can correctly handle tables with partitions
+
+CREATE TABLE part_table(key string, value string) PARTITIONED BY (partitionId int);
+INSERT OVERWRITE TABLE part_table PARTITION (partitionId=1)
+  SELECT key, value FROM src ORDER BY key, value LIMIT 100;
+INSERT OVERWRITE TABLE part_table PARTITION (partitionId=2)
+  SELECT key, value FROM src1 ORDER BY key, value;
+
+set hive.optimize.correlation=false;
+-- In this case, we should not do shared scan on part_table
+-- because left and right tables of JOIN use different partitions
+-- of part_table. With Correlation Optimizer we will generate
+-- 1 MR job.
+EXPLAIN
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 1 AND
+      y.partitionId = 2
+GROUP BY x.key;
+
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 1 AND
+      y.partitionId = 2
+GROUP BY x.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 1 AND
+      y.partitionId = 2
+GROUP BY x.key;
+
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 1 AND
+      y.partitionId = 2
+GROUP BY x.key;
+
+set hive.optimize.correlation=false;
+-- In this case, we should do shared scan on part_table
+-- because left and right tables of JOIN use the same partition
+-- of part_table. With Correlation Optimizer we will generate
+-- 1 MR job.
+EXPLAIN
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 2 AND
+      y.partitionId = 2
+GROUP BY x.key;
+
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 2 AND
+      y.partitionId = 2
+GROUP BY x.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 2 AND
+      y.partitionId = 2
+GROUP BY x.key;
+
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 2 AND
+      y.partitionId = 2
+GROUP BY x.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer12.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer12.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer12.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer12.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,9 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=true;
+-- Currently, correlation optimizer does not support PTF operator
+EXPLAIN SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(x.value) OVER (PARTITION BY x.key) AS cnt FROM src x) xx
+JOIN
+(SELECT y.key as key, count(y.value) OVER (PARTITION BY y.key) AS cnt FROM src1 y) yy
+ON (xx.key=yy.key);

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer13.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer13.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer13.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer13.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,18 @@
+CREATE TABLE tmp(c1 INT, c2 INT, c3 STRING, c4 STRING);
+INSERT OVERWRITE TABLE tmp
+SELECT x.key, y.key, x.value, y.value FROM src x JOIN src y ON (x.key = y.key);
+
+set hive.optimize.correlation=true;
+-- The query in this file have operators with same set of keys
+-- but having different sorting orders.
+-- Correlation optimizer currently do not optimize this case.
+-- This case will be optimized latter (need a follow-up jira).
+
+EXPLAIN
+SELECT xx.key1, xx.key2, yy.key1, yy.key2, xx.cnt, yy.cnt
+FROM 
+(SELECT x.c1 AS key1, x.c3 AS key2, count(1) AS cnt FROM tmp x WHERE x.c1 < 120 GROUP BY x.c3, x.c1) xx
+JOIN
+(SELECT x1.c1 AS key1, x1.c3 AS key2, count(1) AS cnt FROM tmp x1 WHERE x1.c2 > 100 GROUP BY x1.c3, x1.c1) yy
+ON (xx.key1 = yy.key1 AND xx.key2 == yy.key2) ORDER BY xx.key1, xx.key2, yy.key1, yy.key2, xx.cnt, yy.cnt;
+

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer14.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer14.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer14.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer14.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,149 @@
+set hive.optimize.reducededuplication=true;
+set hive.optimize.reducededuplication.min.reducer=1;
+set hive.optimize.correlation=true;
+-- This file is used to show plans of queries involving cluster by, distribute by,
+-- order by, and sort by.
+-- Right now, Correlation optimizer check the most restrictive condition
+-- when determining if a ReduceSinkOperator is not necessary.
+-- This condition is that two ReduceSinkOperators should have same sorting columns,
+-- same partitioning columns, same sorting orders and no conflict on the numbers of reducers. 
+
+-- Distribute by will not be optimized because distribute by does not introduce
+-- sorting columns.
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key) yy
+ON (xx.key=yy.key);
+
+-- Sort by will not be optimized because sort by does not introduce partitioning columns
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y SORT BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=false;
+-- Distribute by and sort by on the same key(s) should be optimized
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=true;
+-- Because for join we use ascending order, if sort by uses descending order,
+-- this query will not be optimized
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key DESC) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key DESC) yy
+ON (xx.key=yy.key);
+
+-- Even if hive.optimize.reducededuplication.min.reducer=1, order by will not be optimized
+-- because order by does not introduce partitioning columns
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x ORDER BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y ORDER BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=false;
+-- Cluster by will be optimized
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x Cluster BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y Cluster BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x Cluster BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y Cluster BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x Cluster BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y Cluster BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x Cluster BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y Cluster BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=false;
+-- If hive.optimize.reducededuplication.min.reducer=1,
+-- group by and then order by should be optimized
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x CLUSTER BY key) xx
+JOIN
+(SELECT y.key as key, count(*) as value FROM src1 y GROUP BY y.key ORDER BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x CLUSTER BY key) xx
+JOIN
+(SELECT y.key as key, count(*) as value FROM src1 y GROUP BY y.key ORDER BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x CLUSTER BY key) xx
+JOIN
+(SELECT y.key as key, count(*) as value FROM src1 y GROUP BY y.key ORDER BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x CLUSTER BY key) xx
+JOIN
+(SELECT y.key as key, count(*) as value FROM src1 y GROUP BY y.key ORDER BY key) yy
+ON (xx.key=yy.key);

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,188 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- In this query, subquery a and b both have a GroupByOperator and the a and b will be
+-- joined. The key of JoinOperator is the same with both keys of GroupByOperators in subquery
+-- a and b. When Correlation Optimizer is turned off, we have four MR jobs.
+-- When Correlation Optimizer is turned on, 2 MR jobs will be generated.
+-- The first job will evaluate subquery tmp (including subquery a, b, and the JoinOperator on a
+-- and b).
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=false;
+-- Left Outer Join should be handled.
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      LEFT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      LEFT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      LEFT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      LEFT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=false;
+-- Right Outer Join should be handled.
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      RIGHT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      RIGHT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      RIGHT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      RIGHT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=false;
+-- Full Outer Join should be handled.
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=false;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT a.key AS key, count(1) AS cnt
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)
+      GROUP BY a.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT a.key AS key, count(1) AS cnt
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)
+      GROUP BY a.key) tmp;
+
+set hive.optimize.correlation=true;
+-- After FULL OUTER JOIN, keys with null values are not grouped, right now,
+-- we have to generate 2 MR jobs for tmp, 1 MR job for a join b and another for the
+-- GroupByOperator on key.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT a.key AS key, count(1) AS cnt
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)
+      GROUP BY a.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT a.key AS key, count(1) AS cnt
+      FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+      FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+      ON (a.key = b.key)
+      GROUP BY a.key) tmp;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, we need 4 MR jobs.
+-- When Correlation Optimizer is turned on, the subquery of tmp will be evaluated in
+-- a single MR job (including the subquery a, the subquery b, and a join b). So, we
+-- will have 2 MR jobs.
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.val AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key AS key, x.value AS val FROM src1 x JOIN src y ON (x.key = y.key)) a
+      JOIN (SELECT z.key AS key, count(z.value) AS cnt FROM src1 z group by z.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.val AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key AS key, x.value AS val FROM src1 x JOIN src y ON (x.key = y.key)) a
+      JOIN (SELECT z.key AS key, count(z.value) AS cnt FROM src1 z group by z.key) b
+      ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.val AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key AS key, x.value AS val FROM src1 x JOIN src y ON (x.key = y.key)) a
+      JOIN (SELECT z.key AS key, count(z.value) AS cnt FROM src1 z group by z.key) b
+      ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.val AS cnt1, b.key AS key2, b.cnt AS cnt2
+      FROM (SELECT x.key AS key, x.value AS val FROM src1 x JOIN src y ON (x.key = y.key)) a
+      JOIN (SELECT z.key AS key, count(z.value) AS cnt FROM src1 z group by z.key) b
+      ON (a.key = b.key)) tmp;

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,98 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 5 MR jobs will be generated.
+-- When Correlation Optimizer is turned on, the subquery tmp will be evalauted
+-- in a single MR job (including the subquery b, the subquery d, and b join d).
+-- At the reduce side of the MR job evaluating tmp, two operation paths
+-- (for subquery b and d) have different depths. The path starting from subquery b
+-- is JOIN->GBY->JOIN, which has a depth of 3. While, the path starting from subquery d
+-- is JOIN->JOIN. We should be able to handle this case.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+      FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+      JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+      ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+      FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+      JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+      ON b.key = d.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+      FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+      JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+      ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+      FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+      JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+      ON b.key = d.key) tmp;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+      FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+      JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+      ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+      FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+      JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+      ON b.key = d.key) tmp;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+      FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+      JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+      ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+      FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+      JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+      ON b.key = d.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+      FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+      JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+      ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+      FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+      JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+      ON b.key = d.key) tmp;
+      
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+      FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+      JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+      ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+      FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+      JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+      ON b.key = d.key) tmp;

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,159 @@
+CREATE TABLE T1(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+CREATE TABLE T2(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+CREATE TABLE T3(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T3;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, this query will be evaluated
+-- by 3 MR jobs. 
+-- When Correlation Optimizer is turned on, this query will be evaluated by
+-- 2 MR jobs. The subquery tmp will be evaluated in a single MR job.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+      
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- This case should be optimized, since the key of GroupByOperator is from the leftmost table
+-- of a chain of LEFT OUTER JOINs.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+-- This query will not be optimized by correlation optimizer because
+-- GroupByOperator uses y.key (a right table of a left outer join)
+-- as the key.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=false;
+-- This case should be optimized, since the key of GroupByOperator is from the rightmost table
+-- of a chain of RIGHT OUTER JOINs.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT z.key AS key, count(1) AS cnt
+      FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY z.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT z.key AS key, count(1) AS cnt
+      FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY z.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT z.key AS key, count(1) AS cnt
+      FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY z.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT z.key AS key, count(1) AS cnt
+      FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY z.key) tmp;
+
+set hive.optimize.correlation=true;
+-- This query will not be optimized by correlation optimizer because
+-- GroupByOperator uses y.key (a left table of a right outer join)
+-- as the key.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=false;
+-- This case should not be optimized because afer the FULL OUTER JOIN, rows with null keys
+-- are not grouped.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+      FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key)
+      GROUP BY y.key) tmp;

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,80 @@
+CREATE TABLE T1(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/kv1.txt' INTO TABLE T1;
+CREATE TABLE T2(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/kv2.txt' INTO TABLE T2;
+CREATE TABLE T3(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/kv3.txt' INTO TABLE T3;
+CREATE TABLE T4(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/kv5.txt' INTO TABLE T4;
+
+CREATE TABLE dest_co1(key INT, val STRING);
+CREATE TABLE dest_co2(key INT, val STRING);
+CREATE TABLE dest_co3(key INT, val STRING);
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 3 MR jobs are needed.
+-- When Correlation Optimizer is turned on, only a single MR job is needed.
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+INSERT OVERWRITE TABLE dest_co1
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+INSERT OVERWRITE TABLE dest_co2
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+set hive.auto.convert.join.noconditionaltask.size=10000000000;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co3
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+INSERT OVERWRITE TABLE dest_co3
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+-- dest_co1, dest_co2 and dest_co3 should be same
+-- SELECT * FROM dest_co1 x ORDER BY x.key, x.val;
+-- SELECT * FROM dest_co2 x ORDER BY x.key, x.val;
+SELECT SUM(HASH(key)), SUM(HASH(val)) FROM dest_co1;
+SELECT SUM(HASH(key)), SUM(HASH(val)) FROM dest_co2;
+SELECT SUM(HASH(key)), SUM(HASH(val)) FROM dest_co3;

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,324 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 6 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx, subquery yy, and xx join yy.
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 3 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery yy and xx join yy.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x GROUP BY x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x GROUP BY x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x GROUP BY x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x GROUP BY x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery yy and xx join yy.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx and xx join yy.
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key;
+
+SELECT xx.key, xx.cnt, yy.key
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key;
+
+SELECT xx.key, xx.cnt, yy.key
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx and xx join yy join zz.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN src zz ON xx.key=zz.key
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON zz.key=yy.key 
+ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN src zz ON xx.key=zz.key
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON zz.key=yy.key 
+ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery yy and xx join yy join zz.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN src zz ON xx.key=zz.key
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON zz.key=yy.key 
+ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN src zz ON xx.key=zz.key
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON zz.key=yy.key 
+ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery yy and xx join yy join zz.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key JOIN src zz
+ON yy.key=zz.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key JOIN src zz
+ON yy.key=zz.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key JOIN src zz
+ON yy.key=zz.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key JOIN src zz
+ON yy.key=zz.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 6 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery tmp and tmp join z.
+EXPLAIN
+SELECT tmp.key, tmp.sum1, tmp.sum2, z.key, z.value
+FROM
+(SELECT xx.key as key, sum(xx.cnt) as sum1, sum(yy.cnt) as sum2
+ FROM (SELECT x.key as key, count(*) AS cnt FROM src x group by x.key) xx
+ JOIN (SELECT y.key as key, count(*) AS cnt FROM src1 y group by y.key) yy
+ ON (xx.key=yy.key) GROUP BY xx.key) tmp
+JOIN src z ON tmp.key=z.key
+ORDER BY tmp.key, tmp.sum1, tmp.sum2, z.key, z.value;
+
+SELECT tmp.key, tmp.sum1, tmp.sum2, z.key, z.value
+FROM
+(SELECT xx.key as key, sum(xx.cnt) as sum1, sum(yy.cnt) as sum2
+ FROM (SELECT x.key as key, count(*) AS cnt FROM src x group by x.key) xx
+ JOIN (SELECT y.key as key, count(*) AS cnt FROM src1 y group by y.key) yy
+ ON (xx.key=yy.key) GROUP BY xx.key) tmp
+JOIN src z ON tmp.key=z.key
+ORDER BY tmp.key, tmp.sum1, tmp.sum2, z.key, z.value;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT tmp.key, tmp.sum1, tmp.sum2, z.key, z.value
+FROM
+(SELECT xx.key as key, sum(xx.cnt) as sum1, sum(yy.cnt) as sum2
+ FROM (SELECT x.key as key, count(*) AS cnt FROM src x group by x.key) xx
+ JOIN (SELECT y.key as key, count(*) AS cnt FROM src1 y group by y.key) yy
+ ON (xx.key=yy.key) GROUP BY xx.key) tmp
+JOIN src z ON tmp.key=z.key
+ORDER BY tmp.key, tmp.sum1, tmp.sum2, z.key, z.value;
+
+SELECT tmp.key, tmp.sum1, tmp.sum2, z.key, z.value
+FROM
+(SELECT xx.key as key, sum(xx.cnt) as sum1, sum(yy.cnt) as sum2
+ FROM (SELECT x.key as key, count(*) AS cnt FROM src x group by x.key) xx
+ JOIN (SELECT y.key as key, count(*) AS cnt FROM src1 y group by y.key) yy
+ ON (xx.key=yy.key) GROUP BY xx.key) tmp
+JOIN src z ON tmp.key=z.key
+ORDER BY tmp.key, tmp.sum1, tmp.sum2, z.key, z.value;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 6 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 4 MR jobs are needed.
+-- 2 MR jobs are used to evaluate yy, 1 MR is used to evaluate xx and xx join yy.
+-- The last MR is used for ordering. 
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+

Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,82 @@
+set hive.auto.convert.join=true;
+
+set hive.optimize.correlation=false;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src x JOIN src1 y ON (x.key = y.key)
+      GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src x JOIN src1 y ON (x.key = y.key)
+      GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src x JOIN src1 y ON (x.key = y.key)
+      GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src x JOIN src1 y ON (x.key = y.key)
+      GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000000000;
+set hive.optimize.mapjoin.mapreduce=true;
+
+set hive.optimize.correlation=false;
+-- Without correlation optimizer, we will have 3 MR jobs.
+-- The first one is a MapJoin and Aggregation (in the Reduce Phase).
+-- The second one is another MapJoin. The third one is for ordering.
+-- With the correlation optimizer, right now, we still have
+-- 3 MR jobs. The first one is a MapJoin and the map-side aggregation (a map-only job).
+-- The second one have the reduce-side aggregation and the second join.
+-- The third one is for ordering.
+-- Although we have turned on hive.optimize.mapjoin.mapreduce, that optimizer
+-- can not handle the case that the MR job (the one which a map-only job will be merged in)
+-- has multiple inputs. We should improve that optimizer.
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src x JOIN src1 y ON (x.key = y.key)
+      GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src x JOIN src1 y ON (x.key = y.key)
+      GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src x JOIN src1 y ON (x.key = y.key)
+      GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+      FROM src x JOIN src1 y ON (x.key = y.key)
+      GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+