You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/07/18 11:16:53 UTC
svn commit: r1504395 [3/15] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/if/
ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql...
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1504395&r1=1504394&r2=1504395&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Thu Jul 18 09:16:52 2013
@@ -416,6 +416,22 @@ public class CommonJoinTaskDispatcher ex
copyReducerConf(mapJoinTask, childTask);
}
+ public static boolean cannotConvert(String bigTableAlias,
+ Map<String, Long> aliasToSize, long aliasTotalKnownInputSize,
+ long ThresholdOfSmallTblSizeSum) {
+ boolean ret = false;
+ Long aliasKnownSize = aliasToSize.get(bigTableAlias);
+ if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
+ long smallTblTotalKnownSize = aliasTotalKnownInputSize
+ - aliasKnownSize.longValue();
+ if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
+ //this table is not good to be a big table.
+ ret = true;
+ }
+ }
+ return ret;
+ }
+
@Override
public Task<? extends Serializable> processCurrentTask(MapRedTask currTask,
ConditionalTask conditionalTask, Context context)
@@ -564,14 +580,9 @@ public class CommonJoinTaskDispatcher ex
MapRedTask newTask = newTaskAlias.getFirst();
bigTableAlias = newTaskAlias.getSecond();
- Long aliasKnownSize = aliasToSize.get(bigTableAlias);
- if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
- long smallTblTotalKnownSize = aliasTotalKnownInputSize
- - aliasKnownSize.longValue();
- if (smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
- // this table is not good to be a big table.
- continue;
- }
+ if (cannotConvert(bigTableAlias, aliasToSize,
+ aliasTotalKnownInputSize, ThresholdOfSmallTblSizeSum)) {
+ continue;
}
// add into conditional task
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1504395&r1=1504394&r2=1504395&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Thu Jul 18 09:16:52 2013
@@ -3499,7 +3499,8 @@ public class SemanticAnalyzer extends Ba
}
List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
- reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+ reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
+ colExprMap);
ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
HashMap<String, ASTNode> aggregationTrees = parseInfo
@@ -3572,7 +3573,8 @@ public class SemanticAnalyzer extends Ba
private List<List<Integer>> getDistinctColIndicesForReduceSink(QBParseInfo parseInfo,
String dest,
List<ExprNodeDesc> reduceKeys, RowResolver reduceSinkInputRowResolver,
- RowResolver reduceSinkOutputRowResolver, List<String> outputKeyColumnNames)
+ RowResolver reduceSinkOutputRowResolver, List<String> outputKeyColumnNames,
+ Map<String, ExprNodeDesc> colExprMap)
throws SemanticException {
List<List<Integer>> distinctColIndices = new ArrayList<List<Integer>>();
@@ -3611,6 +3613,7 @@ public class SemanticAnalyzer extends Ba
ColumnInfo colInfo = new ColumnInfo(field, expr.getTypeInfo(), null, false);
reduceSinkOutputRowResolver.putExpression(parameter, colInfo);
numExprs++;
+ colExprMap.put(colInfo.getInternalName(), expr);
}
distinctColIndices.add(distinctIndices);
}
@@ -3672,7 +3675,8 @@ public class SemanticAnalyzer extends Ba
colExprMap);
List<List<Integer>> distinctColIndices = getDistinctColIndicesForReduceSink(parseInfo, dest,
- reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames);
+ reduceKeys, reduceSinkInputRowResolver, reduceSinkOutputRowResolver, outputKeyColumnNames,
+ colExprMap);
ArrayList<ExprNodeDesc> reduceValues = new ArrayList<ExprNodeDesc>();
@@ -6952,6 +6956,7 @@ public class SemanticAnalyzer extends Ba
reduceValues.size() - 1).getTypeInfo(), "", false);
reduceSinkOutputRowResolver.putExpression(grpbyExpr, colInfo);
outputColumnNames.add(getColumnInternalName(reduceValues.size() - 1));
+ colExprMap.put(colInfo.getInternalName(), grpByExprNode);
}
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DemuxDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DemuxDesc.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DemuxDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DemuxDesc.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Demux operator descriptor implementation.
+ *
+ */
+@Explain(displayName = "Demux Operator")
+public class DemuxDesc extends AbstractOperatorDesc {
+
+ private static final long serialVersionUID = 1L;
+
+ private Map<Integer, Integer> newTagToOldTag;
+ private Map<Integer, Integer> newTagToChildIndex;
+ private List<TableDesc> keysSerializeInfos;
+ private List<TableDesc> valuesSerializeInfos;
+ private Map<Integer, Integer> childIndexToOriginalNumParents;
+
+ public DemuxDesc() {
+ }
+
+ public DemuxDesc(
+ Map<Integer, Integer> newTagToOldTag,
+ Map<Integer, Integer> newTagToChildIndex,
+ Map<Integer, Integer> childIndexToOriginalNumParents,
+ List<TableDesc> keysSerializeInfos,
+ List<TableDesc> valuesSerializeInfos){
+ this.newTagToOldTag = newTagToOldTag;
+ this.newTagToChildIndex = newTagToChildIndex;
+ this.childIndexToOriginalNumParents = childIndexToOriginalNumParents;
+ this.keysSerializeInfos = keysSerializeInfos;
+ this.valuesSerializeInfos = valuesSerializeInfos;
+ }
+
+ public List<TableDesc> getKeysSerializeInfos() {
+ return keysSerializeInfos;
+ }
+
+ public void setKeysSerializeInfos(List<TableDesc> keysSerializeInfos) {
+ this.keysSerializeInfos = keysSerializeInfos;
+ }
+
+ public List<TableDesc> getValuesSerializeInfos() {
+ return valuesSerializeInfos;
+ }
+
+ public void setValuesSerializeInfos(List<TableDesc> valuesSerializeInfos) {
+ this.valuesSerializeInfos = valuesSerializeInfos;
+ }
+
+ public Map<Integer, Integer> getNewTagToOldTag() {
+ return newTagToOldTag;
+ }
+
+ public void setNewTagToOldTag(Map<Integer, Integer> newTagToOldTag) {
+ this.newTagToOldTag = newTagToOldTag;
+ }
+
+ public Map<Integer, Integer> getNewTagToChildIndex() {
+ return newTagToChildIndex;
+ }
+
+ public void setNewTagToChildIndex(Map<Integer, Integer> newTagToChildIndex) {
+ this.newTagToChildIndex = newTagToChildIndex;
+ }
+
+ public Map<Integer, Integer> getChildIndexToOriginalNumParents() {
+ return childIndexToOriginalNumParents;
+ }
+
+ public void setChildIndexToOriginalNumParents(
+ Map<Integer, Integer> childIndexToOriginalNumParents) {
+ this.childIndexToOriginalNumParents = childIndexToOriginalNumParents;
+ }
+}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MuxDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MuxDesc.java?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MuxDesc.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MuxDesc.java Thu Jul 18 09:16:52 2013
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+
+
+/**
+ * Mux operator descriptor implementation..
+ *
+ */
+@Explain(displayName = "Mux Operator")
+public class MuxDesc extends AbstractOperatorDesc {
+ private static final long serialVersionUID = 1L;
+
+ private List<List<ExprNodeDesc>> parentToKeyCols;
+ private List<List<ExprNodeDesc>> parentToValueCols;
+ private List<List<String>> parentToOutputKeyColumnNames;
+ private List<List<String>> parentToOutputValueColumnNames;
+ private List<Integer> parentToTag;
+ private Map<Integer, Integer> newParentIndexToOldParentIndex;
+
+ public MuxDesc() {
+ }
+
+ // A MuxDesc is only generated from a corresponding ReduceSinkDesc.
+ public MuxDesc(List<Operator<? extends OperatorDesc>> ops){
+ int numParents = ops.size();
+ parentToKeyCols = new ArrayList<List<ExprNodeDesc>>(numParents);
+ parentToOutputKeyColumnNames = new ArrayList<List<String>>(numParents);
+ parentToValueCols = new ArrayList<List<ExprNodeDesc>>(numParents);
+ parentToOutputValueColumnNames = new ArrayList<List<String>>(numParents);
+ parentToTag = new ArrayList<Integer>(numParents);
+
+ for (Operator<? extends OperatorDesc> op: ops) {
+ if (op != null && op instanceof ReduceSinkOperator) {
+ ReduceSinkOperator rsop = (ReduceSinkOperator)op;
+ List<ExprNodeDesc> keyCols = rsop.getConf().getKeyCols();
+ List<ExprNodeDesc> valueCols = rsop.getConf().getValueCols();
+ List<String> outputKeyColumnNames = rsop.getConf().getOutputKeyColumnNames();
+ List<String> outputValueColumnNames = rsop.getConf().getOutputValueColumnNames();
+ int tag = rsop.getConf().getTag();
+ parentToKeyCols.add(keyCols);
+ parentToValueCols.add(valueCols);
+ parentToOutputKeyColumnNames.add(outputKeyColumnNames);
+ parentToOutputValueColumnNames.add(outputValueColumnNames);
+ parentToTag.add(tag);
+ } else {
+ parentToKeyCols.add(null);
+ parentToValueCols.add(null);
+ parentToOutputKeyColumnNames.add(null);
+ parentToOutputValueColumnNames.add(null);
+ parentToTag.add(null);
+ }
+ }
+ }
+
+ public List<List<ExprNodeDesc>> getParentToKeyCols() {
+ return parentToKeyCols;
+ }
+
+ public void setParentToKeyCols(List<List<ExprNodeDesc>> parentToKeyCols) {
+ this.parentToKeyCols = parentToKeyCols;
+ }
+
+ public List<List<ExprNodeDesc>> getParentToValueCols() {
+ return parentToValueCols;
+ }
+
+ public void setParentToValueCols(List<List<ExprNodeDesc>> parentToValueCols) {
+ this.parentToValueCols = parentToValueCols;
+ }
+
+ public List<List<String>> getParentToOutputKeyColumnNames() {
+ return parentToOutputKeyColumnNames;
+ }
+
+ public void setParentToOutputKeyColumnNames(
+ List<List<String>> parentToOutputKeyColumnNames) {
+ this.parentToOutputKeyColumnNames = parentToOutputKeyColumnNames;
+ }
+
+ public List<List<String>> getParentToOutputValueColumnNames() {
+ return parentToOutputValueColumnNames;
+ }
+
+ public void setParentToOutputValueColumnNames(
+ List<List<String>> parentToOutputValueColumnNames) {
+ this.parentToOutputValueColumnNames = parentToOutputValueColumnNames;
+ }
+
+ public List<Integer> getParentToTag() {
+ return parentToTag;
+ }
+
+ public void setParentToTag(List<Integer> parentToTag) {
+ this.parentToTag = parentToTag;
+ }
+
+ public Map<Integer, Integer> getNewParentIndexToOldParentIndex() {
+ return newParentIndexToOldParentIndex;
+ }
+
+ public void setNewParentIndexToOldParentIndex(
+ Map<Integer, Integer> newParentIndexToOldParentIndex) {
+ this.newParentIndexToOldParentIndex = newParentIndexToOldParentIndex;
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1504395&r1=1504394&r2=1504395&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Thu Jul 18 09:16:52 2013
@@ -71,13 +71,13 @@ public class ReduceSinkDesc extends Abst
public ReduceSinkDesc() {
}
- public ReduceSinkDesc(java.util.ArrayList<ExprNodeDesc> keyCols,
+ public ReduceSinkDesc(ArrayList<ExprNodeDesc> keyCols,
int numDistributionKeys,
- java.util.ArrayList<ExprNodeDesc> valueCols,
- java.util.ArrayList<java.lang.String> outputKeyColumnNames,
+ ArrayList<ExprNodeDesc> valueCols,
+ ArrayList<String> outputKeyColumnNames,
List<List<Integer>> distinctColumnIndices,
- java.util.ArrayList<java.lang.String> outputValueColumnNames, int tag,
- java.util.ArrayList<ExprNodeDesc> partitionCols, int numReducers,
+ ArrayList<String> outputValueColumnNames, int tag,
+ ArrayList<ExprNodeDesc> partitionCols, int numReducers,
final TableDesc keySerializeInfo, final TableDesc valueSerializeInfo) {
this.keyCols = keyCols;
this.numDistributionKeys = numDistributionKeys;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java?rev=1504395&r1=1504394&r2=1504395&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionDesc.java Thu Jul 18 09:16:52 2013
@@ -26,12 +26,16 @@ package org.apache.hadoop.hive.ql.plan;
@Explain(displayName = "Union")
public class UnionDesc extends AbstractOperatorDesc {
private static final long serialVersionUID = 1L;
-
private transient int numInputs;
+ // If this UnionOperator is inside the reduce side of an MR job generated
+ // by Correlation Optimizer, which means all inputs of this UnionOperator are
+ // from DemuxOperator. If so, we should not touch this UnionOperator in genMapRedTasks.
+ private transient boolean allInputsInSameReducer;
@SuppressWarnings("nls")
public UnionDesc() {
numInputs = 2;
+ allInputsInSameReducer = false;
}
/**
@@ -48,4 +52,12 @@ public class UnionDesc extends AbstractO
public void setNumInputs(int numInputs) {
this.numInputs = numInputs;
}
+
+ public boolean isAllInputsInSameReducer() {
+ return allInputsInSameReducer;
+ }
+
+ public void setAllInputsInSameReducer(boolean allInputsInSameReducer) {
+ this.allInputsInSameReducer = allInputsInSameReducer;
+ }
}
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer1.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,281 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- This query has a GroupByOperator folling JoinOperator and they share the same keys.
+-- When Correlation Optimizer is turned off, three MR jobs will be generated.
+-- When Correlation Optimizer is turned on, two MR jobs will be generated
+-- and JoinOperator (on the column of key) and GroupByOperator (also on the column
+-- of key) will be executed in the first MR job.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.auto.convert.join=true;
+set hive.optimize.correlation=true;
+-- Enable hive.auto.convert.join.
+-- Correlation Optimizer will detect that the join will be converted to a Map-join,
+-- so it will not try to optimize this query.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+set hive.optimize.correlation=true;
+-- Enable hive.auto.convert.join.
+-- Correlation Optimizer will detect that the join will be converted to a Map-join,
+-- so it will not try to optimize this query.
+-- We should generate 1 MR job for subquery tmp.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the left table's key in
+-- a Left Semi Join, these two operators will be executed in
+-- the same MR job when Correlation Optimizer is enabled.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x LEFT SEMI JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x LEFT SEMI JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x LEFT SEMI JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x LEFT SEMI JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the left table's key in
+-- a Left Outer Join, these two operators will be executed in
+-- the same MR job when Correlation Optimizer is enabled.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the right table's key in
+-- a Left Outer Join, we cannot use a single MR to execute these two
+-- operators because those keys with a null value are not grouped.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM src1 x LEFT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the right table's key in
+-- a Right Outer Join, these two operators will be executed in
+-- the same MR job when Correlation Optimizer is enabled.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY y.key) tmp;
+
+
+set hive.optimize.correlation=false;
+-- If the key of a GroupByOperator is the left table's key in
+-- a Right Outer Join, we cannot use a single MR to execute these two
+-- operators because those keys with a null value are not grouped.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x RIGHT OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=false;
+-- This query has a Full Outer Join followed by a GroupByOperator and
+-- they share the same key. Because those keys with a null value are not grouped
+-- in the output of the Full Outer Join, we cannot use a single MR to execute
+-- these two operators.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x FULL OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x FULL OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x FULL OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x FULL OUTER JOIN src y ON (x.key = y.key)
+ GROUP BY x.key) tmp;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- Currently, we only handle exactly same keys, this query will not be optimized
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.value)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, x.value AS value, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key, x.value) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.value)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, x.value AS value, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key, x.value) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.value)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, x.value AS value, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key, x.value) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.value)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, x.value AS value, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key)
+ GROUP BY x.key, x.value) tmp;
+
+set hive.optimize.correlation=false;
+-- Currently, we only handle exactly same keys, this query will not be optimized
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key AND x.value = y.value)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key AND x.value = y.value)
+ GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key AND x.value = y.value)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src1 x JOIN src y ON (x.key = y.key AND x.value = y.value)
+ GROUP BY x.key) tmp;
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer10.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer10.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer10.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer10.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,130 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx and xx join yy.
+-- This case is used to test LEFT SEMI JOIN since Hive will
+-- introduce a GroupByOperator before the ReduceSinkOperator of
+-- the right table (yy in queries below)
+-- of LEFT SEMI JOIN.
+EXPLAIN
+SELECT xx.key, xx.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+LEFT SEMI JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt;
+
+SELECT xx.key, xx.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+LEFT SEMI JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+LEFT SEMI JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt;
+
+SELECT xx.key, xx.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+LEFT SEMI JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx and xx join yy.
+-- This case is used to test LEFT SEMI JOIN since Hive will
+-- introduce a GroupByOperator before the ReduceSinkOperator of
+-- the right table (yy in queries below)
+-- of LEFT SEMI JOIN.
+EXPLAIN
+SELECT xx.key, xx.value
+FROM
+src1 xx
+LEFT SEMI JOIN
+(SELECT x.key as key
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND
+ y.key > 20) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+SELECT xx.key, xx.value
+FROM
+src1 xx
+LEFT SEMI JOIN
+(SELECT x.key as key
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND
+ y.key > 20) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value
+FROM
+src1 xx
+LEFT SEMI JOIN
+(SELECT x.key as key
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND
+ y.key > 20) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+SELECT xx.key, xx.value
+FROM
+src1 xx
+LEFT SEMI JOIN
+(SELECT x.key as key
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND
+ y.key > 20) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- This test is used to test if we can use shared scan for
+-- xx, yy:x, and yy:y.
+EXPLAIN
+SELECT xx.key, xx.value
+FROM
+src xx
+LEFT SEMI JOIN
+(SELECT x.key as key
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND x.key > 180) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+SELECT xx.key, xx.value
+FROM
+src xx
+LEFT SEMI JOIN
+(SELECT x.key as key
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND x.key > 180) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value
+FROM
+src xx
+LEFT SEMI JOIN
+(SELECT x.key as key
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND x.key > 180) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
+
+SELECT xx.key, xx.value
+FROM
+src xx
+LEFT SEMI JOIN
+(SELECT x.key as key
+ FROM src x JOIN src y ON (x.key = y.key)
+ WHERE x.key < 200 AND x.key > 180) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.value;
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer11.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer11.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer11.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer11.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,73 @@
+set hive.auto.convert.join=false;
+-- Tests in this file are used to make sure Correlation Optimizer
+-- can correctly handle tables with partitions
+
+CREATE TABLE part_table(key string, value string) PARTITIONED BY (partitionId int);
+INSERT OVERWRITE TABLE part_table PARTITION (partitionId=1)
+ SELECT key, value FROM src ORDER BY key, value LIMIT 100;
+INSERT OVERWRITE TABLE part_table PARTITION (partitionId=2)
+ SELECT key, value FROM src1 ORDER BY key, value;
+
+set hive.optimize.correlation=false;
+-- In this case, we should not do shared scan on part_table
+-- because left and right tables of JOIN use different partitions
+-- of part_table. With Correlation Optimizer we will generate
+-- 1 MR job.
+EXPLAIN
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 1 AND
+ y.partitionId = 2
+GROUP BY x.key;
+
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 1 AND
+ y.partitionId = 2
+GROUP BY x.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 1 AND
+ y.partitionId = 2
+GROUP BY x.key;
+
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 1 AND
+ y.partitionId = 2
+GROUP BY x.key;
+
+set hive.optimize.correlation=false;
+-- In this case, we should do shared scan on part_table
+-- because left and right tables of JOIN use the same partition
+-- of part_table. With Correlation Optimizer we will generate
+-- 1 MR job.
+EXPLAIN
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 2 AND
+ y.partitionId = 2
+GROUP BY x.key;
+
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 2 AND
+ y.partitionId = 2
+GROUP BY x.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 2 AND
+ y.partitionId = 2
+GROUP BY x.key;
+
+SELECT x.key AS key, count(1) AS cnt
+FROM part_table x JOIN part_table y ON (x.key = y.key)
+WHERE x.partitionId = 2 AND
+ y.partitionId = 2
+GROUP BY x.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer12.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer12.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer12.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer12.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,9 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=true;
+-- Currently, correlation optimizer does not support PTF operator
+EXPLAIN SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(x.value) OVER (PARTITION BY x.key) AS cnt FROM src x) xx
+JOIN
+(SELECT y.key as key, count(y.value) OVER (PARTITION BY y.key) AS cnt FROM src1 y) yy
+ON (xx.key=yy.key);
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer13.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer13.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer13.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer13.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,18 @@
+CREATE TABLE tmp(c1 INT, c2 INT, c3 STRING, c4 STRING);
+INSERT OVERWRITE TABLE tmp
+SELECT x.key, y.key, x.value, y.value FROM src x JOIN src y ON (x.key = y.key);
+
+set hive.optimize.correlation=true;
+-- The query in this file have operators with same set of keys
+-- but having different sorting orders.
+-- Correlation optimizer currently do not optimize this case.
+-- This case will be optimized latter (need a follow-up jira).
+
+EXPLAIN
+SELECT xx.key1, xx.key2, yy.key1, yy.key2, xx.cnt, yy.cnt
+FROM
+(SELECT x.c1 AS key1, x.c3 AS key2, count(1) AS cnt FROM tmp x WHERE x.c1 < 120 GROUP BY x.c3, x.c1) xx
+JOIN
+(SELECT x1.c1 AS key1, x1.c3 AS key2, count(1) AS cnt FROM tmp x1 WHERE x1.c2 > 100 GROUP BY x1.c3, x1.c1) yy
+ON (xx.key1 = yy.key1 AND xx.key2 == yy.key2) ORDER BY xx.key1, xx.key2, yy.key1, yy.key2, xx.cnt, yy.cnt;
+
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer14.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer14.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer14.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer14.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,149 @@
+set hive.optimize.reducededuplication=true;
+set hive.optimize.reducededuplication.min.reducer=1;
+set hive.optimize.correlation=true;
+-- This file is used to show plans of queries involving cluster by, distribute by,
+-- order by, and sort by.
+-- Right now, Correlation optimizer check the most restrictive condition
+-- when determining if a ReduceSinkOperator is not necessary.
+-- This condition is that two ReduceSinkOperators should have same sorting columns,
+-- same partitioning columns, same sorting orders and no conflict on the numbers of reducers.
+
+-- Distribute by will not be optimized because distribute by does not introduce
+-- sorting columns.
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key) yy
+ON (xx.key=yy.key);
+
+-- Sort by will not be optimized because sort by does not introduce partitioning columns
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y SORT BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=false;
+-- Distribute by and sort by on the same key(s) should be optimized
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=true;
+-- Because for join we use ascending order, if sort by uses descending order,
+-- this query will not be optimized
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x DISTRIBUTE BY key SORT BY key DESC) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y DISTRIBUTE BY key SORT BY key DESC) yy
+ON (xx.key=yy.key);
+
+-- Even if hive.optimize.reducededuplication.min.reducer=1, order by will not be optimized
+-- because order by does not introduce partitioning columns
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x ORDER BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y ORDER BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=false;
+-- Cluster by will be optimized
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x Cluster BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y Cluster BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x Cluster BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y Cluster BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x Cluster BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y Cluster BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x Cluster BY key) xx
+JOIN
+(SELECT y.key as key, y.value as value FROM src1 y Cluster BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=false;
+-- If hive.optimize.reducededuplication.min.reducer=1,
+-- group by and then order by should be optimized
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x CLUSTER BY key) xx
+JOIN
+(SELECT y.key as key, count(*) as value FROM src1 y GROUP BY y.key ORDER BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x CLUSTER BY key) xx
+JOIN
+(SELECT y.key as key, count(*) as value FROM src1 y GROUP BY y.key ORDER BY key) yy
+ON (xx.key=yy.key);
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x CLUSTER BY key) xx
+JOIN
+(SELECT y.key as key, count(*) as value FROM src1 y GROUP BY y.key ORDER BY key) yy
+ON (xx.key=yy.key);
+
+SELECT xx.key, xx.value, yy.key, yy.value
+FROM
+(SELECT x.key as key, x.value as value FROM src x CLUSTER BY key) xx
+JOIN
+(SELECT y.key as key, count(*) as value FROM src1 y GROUP BY y.key ORDER BY key) yy
+ON (xx.key=yy.key);
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer2.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,188 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- In this query, subquery a and b both have a GroupByOperator and the a and b will be
+-- joined. The key of JoinOperator is the same with both keys of GroupByOperators in subquery
+-- a and b. When Correlation Optimizer is turned off, we have four MR jobs.
+-- When Correlation Optimizer is turned on, 2 MR jobs will be generated.
+-- The first job will evaluate subquery tmp (including subquery a, b, and the JoinOperator on a
+-- and b).
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=false;
+-- Left Outer Join should be handled.
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ LEFT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ LEFT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ LEFT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ LEFT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=false;
+-- Right Outer Join should be handled.
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ RIGHT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ RIGHT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ RIGHT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ RIGHT OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=false;
+-- Full Outer Join should be handled.
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.cnt AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=false;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT a.key AS key, count(1) AS cnt
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)
+ GROUP BY a.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT a.key AS key, count(1) AS cnt
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)
+ GROUP BY a.key) tmp;
+
+set hive.optimize.correlation=true;
+-- After FULL OUTER JOIN, keys with null values are not grouped, right now,
+-- we have to generate 2 MR jobs for tmp, 1 MR job for a join b and another for the
+-- GroupByOperator on key.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT a.key AS key, count(1) AS cnt
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)
+ GROUP BY a.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT a.key AS key, count(1) AS cnt
+ FROM (SELECT x.key as key, count(x.value) AS cnt FROM src x group by x.key) a
+ FULL OUTER JOIN (SELECT y.key as key, count(y.value) AS cnt FROM src1 y group by y.key) b
+ ON (a.key = b.key)
+ GROUP BY a.key) tmp;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, we need 4 MR jobs.
+-- When Correlation Optimizer is turned on, the subquery of tmp will be evaluated in
+-- a single MR job (including the subquery a, the subquery b, and a join b). So, we
+-- will have 2 MR jobs.
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.val AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key AS key, x.value AS val FROM src1 x JOIN src y ON (x.key = y.key)) a
+ JOIN (SELECT z.key AS key, count(z.value) AS cnt FROM src1 z group by z.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.val AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key AS key, x.value AS val FROM src1 x JOIN src y ON (x.key = y.key)) a
+ JOIN (SELECT z.key AS key, count(z.value) AS cnt FROM src1 z group by z.key) b
+ ON (a.key = b.key)) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.val AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key AS key, x.value AS val FROM src1 x JOIN src y ON (x.key = y.key)) a
+ JOIN (SELECT z.key AS key, count(z.value) AS cnt FROM src1 z group by z.key) b
+ ON (a.key = b.key)) tmp;
+
+SELECT SUM(HASH(key1)), SUM(HASH(cnt1)), SUM(HASH(key2)), SUM(HASH(cnt2))
+FROM (SELECT a.key AS key1, a.val AS cnt1, b.key AS key2, b.cnt AS cnt2
+ FROM (SELECT x.key AS key, x.value AS val FROM src1 x JOIN src y ON (x.key = y.key)) a
+ JOIN (SELECT z.key AS key, count(z.value) AS cnt FROM src1 z group by z.key) b
+ ON (a.key = b.key)) tmp;
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer3.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,98 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 5 MR jobs will be generated.
+-- When Correlation Optimizer is turned on, the subquery tmp will be evalauted
+-- in a single MR job (including the subquery b, the subquery d, and b join d).
+-- At the reduce side of the MR job evaluating tmp, two operation paths
+-- (for subquery b and d) have different depths. The path starting from subquery b
+-- is JOIN->GBY->JOIN, which has a depth of 3. While, the path starting from subquery d
+-- is JOIN->JOIN. We should be able to handle this case.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+ FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+ JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+ FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+ JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ ON b.key = d.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+ FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+ JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+ FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+ JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ ON b.key = d.key) tmp;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+ FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+ JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT b.key AS key, b.cnt AS cnt, d.value AS value
+ FROM (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) b
+ JOIN (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) d
+ ON b.key = d.key) tmp;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+ FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+ JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+ ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+ FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+ JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+ ON b.key = d.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+ FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+ JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+ ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+ FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+ JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+ ON b.key = d.key) tmp;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+ FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+ JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+ ON b.key = d.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt)), SUM(HASH(tmp.value))
+FROM (SELECT d.key AS key, d.cnt AS cnt, b.value as value
+ FROM (SELECT x.key, x.value FROM src1 x JOIN src y ON (x.key = y.key)) b
+ JOIN (SELECT x.key, count(1) AS cnt FROM src1 x JOIN src y ON (x.key = y.key) group by x.key) d
+ ON b.key = d.key) tmp;
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer4.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,159 @@
+CREATE TABLE T1(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+CREATE TABLE T2(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+CREATE TABLE T3(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T3;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, this query will be evaluated
+-- by 3 MR jobs.
+-- When Correlation Optimizer is turned on, this query will be evaluated by
+-- 2 MR jobs. The subquery tmp will be evaluated in a single MR job.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x JOIN T1 y ON (x.key = y.key) JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- This case should be optimized, since the key of GroupByOperator is from the leftmost table
+-- of a chain of LEFT OUTER JOINs.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY x.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY x.key) tmp;
+
+set hive.optimize.correlation=true;
+-- This query will not be optimized by correlation optimizer because
+-- GroupByOperator uses y.key (a right table of a left outer join)
+-- as the key.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x LEFT OUTER JOIN T1 y ON (x.key = y.key) LEFT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=false;
+-- This case should be optimized, since the key of GroupByOperator is from the rightmost table
+-- of a chain of RIGHT OUTER JOINs.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT z.key AS key, count(1) AS cnt
+ FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY z.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT z.key AS key, count(1) AS cnt
+ FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY z.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT z.key AS key, count(1) AS cnt
+ FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY z.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT z.key AS key, count(1) AS cnt
+ FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY z.key) tmp;
+
+set hive.optimize.correlation=true;
+-- This query will not be optimized by correlation optimizer because
+-- GroupByOperator uses y.key (a left table of a right outer join)
+-- as the key.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x RIGHT OUTER JOIN T1 y ON (x.key = y.key) RIGHT OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=false;
+-- This case should not be optimized because afer the FULL OUTER JOIN, rows with null keys
+-- are not grouped.
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
+
+SELECT SUM(HASH(tmp.key)), SUM(HASH(tmp.cnt))
+FROM (SELECT y.key AS key, count(1) AS cnt
+ FROM T2 x FULL OUTER JOIN T1 y ON (x.key = y.key) FULL OUTER JOIN T3 z ON (y.key = z.key)
+ GROUP BY y.key) tmp;
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer5.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,80 @@
+CREATE TABLE T1(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/kv1.txt' INTO TABLE T1;
+CREATE TABLE T2(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/kv2.txt' INTO TABLE T2;
+CREATE TABLE T3(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/kv3.txt' INTO TABLE T3;
+CREATE TABLE T4(key INT, val STRING);
+LOAD DATA LOCAL INPATH '../data/files/kv5.txt' INTO TABLE T4;
+
+CREATE TABLE dest_co1(key INT, val STRING);
+CREATE TABLE dest_co2(key INT, val STRING);
+CREATE TABLE dest_co3(key INT, val STRING);
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 3 MR jobs are needed.
+-- When Correlation Optimizer is turned on, only a single MR job is needed.
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co1
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+INSERT OVERWRITE TABLE dest_co1
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co2
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+INSERT OVERWRITE TABLE dest_co2
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+set hive.auto.convert.join.noconditionaltask.size=10000000000;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+INSERT OVERWRITE TABLE dest_co3
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+INSERT OVERWRITE TABLE dest_co3
+SELECT b.key, d.val
+FROM
+(SELECT x.key, x.val FROM T1 x JOIN T2 y ON (x.key = y.key)) b
+JOIN
+(SELECT m.key, n.val FROM T3 m JOIN T4 n ON (m.key = n.key)) d
+ON b.key = d.key;
+
+-- dest_co1, dest_co2 and dest_co3 should be same
+-- SELECT * FROM dest_co1 x ORDER BY x.key, x.val;
+-- SELECT * FROM dest_co2 x ORDER BY x.key, x.val;
+SELECT SUM(HASH(key)), SUM(HASH(val)) FROM dest_co1;
+SELECT SUM(HASH(key)), SUM(HASH(val)) FROM dest_co2;
+SELECT SUM(HASH(key)), SUM(HASH(val)) FROM dest_co3;
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer6.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,324 @@
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 6 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx, subquery yy, and xx join yy.
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+-- Enable hive.auto.convert.join.
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.cnt;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 3 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery yy and xx join yy.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x GROUP BY x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x GROUP BY x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x GROUP BY x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x GROUP BY x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery yy and xx join yy.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.auto.convert.join=false;
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx and xx join yy.
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key;
+
+SELECT xx.key, xx.cnt, yy.key
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key;
+
+SELECT xx.key, xx.cnt, yy.key
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN src yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery xx and xx join yy join zz.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN src zz ON xx.key=zz.key
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON zz.key=yy.key
+ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN src zz ON xx.key=zz.key
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON zz.key=yy.key
+ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery yy and xx join yy join zz.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN src zz ON xx.key=zz.key
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON zz.key=yy.key
+ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN src zz ON xx.key=zz.key
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON zz.key=yy.key
+ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 4 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery yy and xx join yy join zz.
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key JOIN src zz
+ON yy.key=zz.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key JOIN src zz
+ON yy.key=zz.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key JOIN src zz
+ON yy.key=zz.key ORDER BY xx.key, yy.key, yy.cnt;
+
+SELECT xx.key, yy.key, yy.cnt
+FROM src1 xx
+JOIN
+(SELECT x.key as key, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key) yy
+ON xx.key=yy.key JOIN src zz
+ON yy.key=zz.key ORDER BY xx.key, yy.key, yy.cnt;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 6 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 2 MR jobs are needed.
+-- The first job will evaluate subquery tmp and tmp join z.
+EXPLAIN
+SELECT tmp.key, tmp.sum1, tmp.sum2, z.key, z.value
+FROM
+(SELECT xx.key as key, sum(xx.cnt) as sum1, sum(yy.cnt) as sum2
+ FROM (SELECT x.key as key, count(*) AS cnt FROM src x group by x.key) xx
+ JOIN (SELECT y.key as key, count(*) AS cnt FROM src1 y group by y.key) yy
+ ON (xx.key=yy.key) GROUP BY xx.key) tmp
+JOIN src z ON tmp.key=z.key
+ORDER BY tmp.key, tmp.sum1, tmp.sum2, z.key, z.value;
+
+SELECT tmp.key, tmp.sum1, tmp.sum2, z.key, z.value
+FROM
+(SELECT xx.key as key, sum(xx.cnt) as sum1, sum(yy.cnt) as sum2
+ FROM (SELECT x.key as key, count(*) AS cnt FROM src x group by x.key) xx
+ JOIN (SELECT y.key as key, count(*) AS cnt FROM src1 y group by y.key) yy
+ ON (xx.key=yy.key) GROUP BY xx.key) tmp
+JOIN src z ON tmp.key=z.key
+ORDER BY tmp.key, tmp.sum1, tmp.sum2, z.key, z.value;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT tmp.key, tmp.sum1, tmp.sum2, z.key, z.value
+FROM
+(SELECT xx.key as key, sum(xx.cnt) as sum1, sum(yy.cnt) as sum2
+ FROM (SELECT x.key as key, count(*) AS cnt FROM src x group by x.key) xx
+ JOIN (SELECT y.key as key, count(*) AS cnt FROM src1 y group by y.key) yy
+ ON (xx.key=yy.key) GROUP BY xx.key) tmp
+JOIN src z ON tmp.key=z.key
+ORDER BY tmp.key, tmp.sum1, tmp.sum2, z.key, z.value;
+
+SELECT tmp.key, tmp.sum1, tmp.sum2, z.key, z.value
+FROM
+(SELECT xx.key as key, sum(xx.cnt) as sum1, sum(yy.cnt) as sum2
+ FROM (SELECT x.key as key, count(*) AS cnt FROM src x group by x.key) xx
+ JOIN (SELECT y.key as key, count(*) AS cnt FROM src1 y group by y.key) yy
+ ON (xx.key=yy.key) GROUP BY xx.key) tmp
+JOIN src z ON tmp.key=z.key
+ORDER BY tmp.key, tmp.sum1, tmp.sum2, z.key, z.value;
+
+set hive.optimize.correlation=false;
+-- When Correlation Optimizer is turned off, 6 MR jobs are needed.
+-- When Correlation Optimizer is turned on, 4 MR jobs are needed.
+-- 2 MR jobs are used to evaluate yy, 1 MR is used to evaluate xx and xx join yy.
+-- The last MR is used for ordering.
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+set hive.optimize.correlation=true;
+set hive.auto.convert.join=true;
+set hive.optimize.mapjoin.mapreduce=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value, yy.cnt
+FROM
+(SELECT x.key as key, count(1) as cnt FROM src1 x JOIN src1 y ON (x.key = y.key) group by x.key) xx
+JOIN
+(SELECT x.key as key, x.value as value, count(1) as cnt FROM src x JOIN src y ON (x.key = y.key) group by x.key, x.value) yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value, yy.cnt;
+
Added: hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q?rev=1504395&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/correlationoptimizer7.q Thu Jul 18 09:16:52 2013
@@ -0,0 +1,82 @@
+set hive.auto.convert.join=true;
+
+set hive.optimize.correlation=false;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src x JOIN src1 y ON (x.key = y.key)
+ GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src x JOIN src1 y ON (x.key = y.key)
+ GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src x JOIN src1 y ON (x.key = y.key)
+ GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src x JOIN src1 y ON (x.key = y.key)
+ GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+set hive.auto.convert.join=true;
+set hive.auto.convert.join.noconditionaltask=true;
+set hive.auto.convert.join.noconditionaltask.size=10000000000;
+set hive.optimize.mapjoin.mapreduce=true;
+
+set hive.optimize.correlation=false;
+-- Without correlation optimizer, we will have 3 MR jobs.
+-- The first one is a MapJoin and Aggregation (in the Reduce Phase).
+-- The second one is another MapJoin. The third one is for ordering.
+-- With the correlation optimizer, right now, we still have
+-- 3 MR jobs. The first one is a MapJoin and the map-side aggregation (a map-only job).
+-- The second one have the reduce-side aggregation and the second join.
+-- The third one is for ordering.
+-- Although we have turned on hive.optimize.mapjoin.mapreduce, that optimizer
+-- can not handle the case that the MR job (the one which a map-only job will be merged in)
+-- has multiple inputs. We should improve that optimizer.
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src x JOIN src1 y ON (x.key = y.key)
+ GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src x JOIN src1 y ON (x.key = y.key)
+ GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+set hive.optimize.correlation=true;
+EXPLAIN
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src x JOIN src1 y ON (x.key = y.key)
+ GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+
+SELECT xx.key, xx.cnt, yy.key, yy.value
+FROM (SELECT x.key AS key, count(1) AS cnt
+ FROM src x JOIN src1 y ON (x.key = y.key)
+ GROUP BY x.key) xx
+JOIN src1 yy
+ON xx.key=yy.key ORDER BY xx.key, xx.cnt, yy.key, yy.value;
+