You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ke...@apache.org on 2012/09/18 07:53:37 UTC
svn commit: r1386996 [1/5] - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/optimizer/
ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/ap...
Author: kevinwilfong
Date: Tue Sep 18 05:53:35 2012
New Revision: 1386996
URL: http://svn.apache.org/viewvc?rev=1386996&view=rev
Log:
HIVE-3086. Skewed Join Optimization. njain via kevinwilfong
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q
hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt1.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt10.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt11.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt12.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt13.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt14.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt15.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt16.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt17.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt18.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt19.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt2.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt20.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt3.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt4.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt5.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt6.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt7.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt8.q.out
hive/trunk/ql/src/test/results/clientpositive/skewjoinopt9.q.out
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Sep 18 05:53:35 2012
@@ -495,6 +495,9 @@ public class HiveConf extends Configurat
HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
+ // optimize skewed join by changing the query plan at compile time
+ HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME("hive.optimize.skewjoin.compiletime", false),
+
// Indexes
HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G
HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Tue Sep 18 05:53:35 2012
@@ -426,6 +426,27 @@
</property>
<property>
+ <name>hive.optimize.skewjoin.compiletime</name>
+ <value>false</value>
+ <description>Whether to create a separate plan for skewed keys for the tables in the join.
+ This is based on the skewed keys stored in the metadata. At compile time, the plan is broken
+ into different joins: one for the skewed keys, and the other for the remaining keys. And then,
+ a union is performed for the 2 joins generated above. So unless the same skewed key is present
+ in both the joined tables, the join for the skewed key will be performed as a map-side join.
+
+ The main difference between this paramater and hive.optimize.skewjoin is that this parameter
+ uses the skew information stored in the metastore to optimize the plan at compile time itself.
+ If there is no skew information in the metadata, this parameter will not have any affect.
+ Both hive.optimize.skewjoin.compiletime and hive.optimize.skewjoin should be set to true.
+ Ideally, hive.optimize.skewjoin should be renamed as hive.optimize.skewjoin.runtime, but not doing
+ so for backward compatibility.
+
+ If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime
+ would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op.
+ </description>
+</property>
+
+<property>
<name>hive.multigroupby.singlemr</name>
<value>false</value>
<description>Whether to optimize multi group by query to generate single M/R
@@ -459,7 +480,13 @@
<property>
<name>hive.optimize.skewjoin</name>
<value>false</value>
- <description>Whether to enable skew join optimization. </description>
+ <description>Whether to enable skew join optimization.
+ The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of
+ processing those keys, store them temporarily in a hdfs directory. In a follow-up map-reduce
+ job, process those skewed keys. The same key need not be skewed for all the tables, and so,
+ the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a
+ map-join.
+</description>
</property>
<property>
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Tue Sep 18 05:53:35 2012
@@ -160,4 +160,9 @@ public class FilterOperator extends Oper
public OperatorType getType() {
return OperatorType.FILTER;
}
+
+ @Override
+ public boolean supportSkewJoinOptimization() {
+ return true;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Tue Sep 18 05:53:35 2012
@@ -266,4 +266,11 @@ public class JoinOperator extends Common
}
}
+ @Override
+ public boolean supportSkewJoinOptimization() {
+ // Since skew join optimization makes a copy of the tree above joins, and
+ // there is no multi-query optimization in place, let us not use skew join
+ // optimizations for now.
+ return false;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Sep 18 05:53:35 2012
@@ -1338,6 +1338,10 @@ public abstract class Operator<T extends
public void cleanUpInputFileChangedOp() throws HiveException {
}
+ public boolean supportSkewJoinOptimization() {
+ return false;
+ }
+
@Override
public Operator<? extends OperatorDesc> clone()
throws CloneNotSupportedException {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Sep 18 05:53:35 2012
@@ -39,9 +39,9 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Tue Sep 18 05:53:35 2012
@@ -100,4 +100,9 @@ public class SelectOperator extends Oper
public OperatorType getType() {
return OperatorType.SELECT;
}
+
+ @Override
+ public boolean supportSkewJoinOptimization() {
+ return true;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Sep 18 05:53:35 2012
@@ -278,4 +278,9 @@ public class TableScanOperator extends O
}
}
}
+
+ @Override
+ public boolean supportSkewJoinOptimization() {
+ return true;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Tue Sep 18 05:53:35 2012
@@ -55,6 +55,9 @@ public class Optimizer {
transformations.add(new PartitionPruner());
transformations.add(new PartitionConditionRemover());
}
+ if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+ transformations.add(new SkewJoinOptimizer());
+ }
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGBYUSINGINDEX)) {
transformations.add(new RewriteGBUsingIndex());
}
@@ -88,7 +91,7 @@ public class Optimizer {
*/
public ParseContext optimize() throws SemanticException {
for (Transform t : transformations) {
- pctx = t.transform(pctx);
+ pctx = t.transform(pctx);
}
return pctx;
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Tue Sep 18 05:53:35 2012
@@ -0,0 +1,681 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.UnionDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+/**
+ * SkewJoinOptimizer.
+ *
+ */
+public class SkewJoinOptimizer implements Transform {
+
+ private static final Log LOG = LogFactory.getLog(SkewJoinOptimizer.class.getName());
+ private static ParseContext parseContext;
+
+ public static class SkewJoinProc implements NodeProcessor {
+ public SkewJoinProc() {
+ super();
+ }
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ // We should be having a tree which looks like this
+ // TS -> * -> RS -
+ // \
+ // -> JOIN -> ..
+ // /
+ // TS -> * -> RS -
+ //
+ // We are in the join operator now.
+
+ SkewJoinOptProcCtx ctx = (SkewJoinOptProcCtx) procCtx;
+ parseContext = ctx.getpGraphContext();
+
+ JoinOperator joinOp = (JoinOperator)nd;
+ // This join has already been processed
+ if (ctx.getDoneJoins().contains(joinOp)) {
+ return null;
+ }
+
+ ctx.getDoneJoins().add(joinOp);
+
+ Operator<? extends OperatorDesc> currOp = joinOp;
+ boolean processSelect = false;
+
+ // Is there a select following
+ // Clone the select also. It is useful for a follow-on optimization where the union
+ // followed by a select star is completely removed.
+ if ((joinOp.getChildOperators().size() == 1) &&
+ (joinOp.getChildOperators().get(0) instanceof SelectOperator)) {
+ currOp = joinOp.getChildOperators().get(0);
+ processSelect = true;
+ }
+
+ List<TableScanOperator> tableScanOpsForJoin = new ArrayList<TableScanOperator>();
+ if (!getTableScanOpsForJoin(joinOp, tableScanOpsForJoin)) {
+ return null;
+ }
+
+ if ((tableScanOpsForJoin == null) || (tableScanOpsForJoin.isEmpty())) {
+ return null;
+ }
+
+ // Get the skewed values in all the tables
+ Map<List<ExprNodeDesc>, List<List<String>>> skewedValues =
+ getSkewedValues(joinOp, tableScanOpsForJoin);
+
+ // If there are no skewed values, nothing needs to be done
+ if (skewedValues == null || skewedValues.size() == 0) {
+ return null;
+ }
+
+ // After this optimization, the tree should be like:
+ // TS -> (FIL "skewed rows") * -> RS -
+ // \
+ // -> JOIN
+ // / \
+ // TS -> (FIL "skewed rows") * -> RS - \
+ // \
+ // -> UNION -> ..
+ // /
+ // TS -> (FIL "no skewed rows") * -> RS - /
+ // \ /
+ // -> JOIN
+ // /
+ // TS -> (FIL "no skewed rows") * -> RS -
+ //
+
+ // Create a clone of the operator
+ Operator<? extends OperatorDesc> currOpClone;
+ try {
+ currOpClone = currOp.clone();
+ insertRowResolvers(currOp, currOpClone, ctx);
+ } catch (CloneNotSupportedException e) {
+ LOG.debug("Operator tree could not be cloned");
+ return null;
+ }
+
+ JoinOperator joinOpClone;
+ if (processSelect) {
+ joinOpClone = (JoinOperator)(currOpClone.getParentOperators().get(0));
+ } else {
+ joinOpClone = (JoinOperator)currOpClone;
+ }
+
+ // Put the filter "skewed column = skewed keys" in op
+ // and "skewed columns != skewed keys" in selectOpClone
+ insertSkewFilter(tableScanOpsForJoin, skewedValues, true);
+
+ List<TableScanOperator> tableScanCloneOpsForJoin =
+ new ArrayList<TableScanOperator>();
+ assert
+ getTableScanOpsForJoin(joinOpClone, tableScanCloneOpsForJoin);
+
+ insertSkewFilter(tableScanCloneOpsForJoin, skewedValues, false);
+
+ // Update the topOps appropriately
+ Map<String, Operator<? extends OperatorDesc>> topOps = getTopOps(joinOpClone);
+ Map<String, Operator<? extends OperatorDesc>> origTopOps = parseContext.getTopOps();
+
+ for (Entry<String, Operator<? extends OperatorDesc>> topOp : topOps.entrySet()) {
+ TableScanOperator tso = (TableScanOperator) topOp.getValue();
+ Table origTable = parseContext.getTopToTable().get(ctx.getCloneTSOpMap().get(tso));
+ String tabAlias = tso.getConf().getAlias();
+ parseContext.getTopToTable().put(tso, origTable);
+ int initCnt = 1;
+ String newAlias = "subquery" + initCnt + ":" + tabAlias;
+ while (origTopOps.containsKey(newAlias)) {
+ initCnt++;
+ newAlias = "subquery" + initCnt + ":" + tabAlias;
+ }
+
+ parseContext.getTopOps().put(newAlias, tso);
+ }
+
+ // Now do a union of the select operators: selectOp and selectOpClone
+ // Store the operator that follows the select after the join, we will be
+ // adding this as a child to the Union later
+ List<Operator<? extends OperatorDesc>> finalOps = currOp.getChildOperators();
+ currOp.setChildOperators(null);
+ currOpClone.setChildOperators(null);
+
+ // Make the union operator
+ List<Operator<? extends OperatorDesc>> oplist =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ oplist.add(currOp);
+ oplist.add(currOpClone);
+ Operator<? extends OperatorDesc> unionOp =
+ OperatorFactory.getAndMakeChild(
+ new UnionDesc(), new RowSchema(currOp.getSchema().getSignature()), oplist);
+
+ RowResolver unionRR = parseContext.getOpParseCtx().get(currOp).getRowResolver();
+ GenMapRedUtils.putOpInsertMap(unionOp, unionRR, parseContext);
+
+ // Introduce a select after the union
+ List<Operator<? extends OperatorDesc>> unionList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ unionList.add(unionOp);
+
+ Operator<? extends OperatorDesc> selectUnionOp =
+ OperatorFactory.getAndMakeChild(
+ new SelectDesc(true),
+ new RowSchema(unionOp.getSchema().getSignature()), unionList);
+ GenMapRedUtils.putOpInsertMap(selectUnionOp, unionRR, parseContext);
+
+ // add the finalOp after the union
+ selectUnionOp.setChildOperators(finalOps);
+ // replace the original selectOp in the parents with selectUnionOp
+ for (Operator<? extends OperatorDesc> finalOp : finalOps) {
+ finalOp.replaceParent(currOp, selectUnionOp);
+ }
+ return null;
+ }
+
+ /*
+ * Get the list of table scan operators for this join. A interface supportSkewJoinOptimization
+ * has been provided. Currently, it is only enabled for simple filters and selects.
+ */
+ private boolean getTableScanOpsForJoin(
+ JoinOperator op,
+ List<TableScanOperator> tsOps) {
+
+ for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+ if (!getTableScanOps(parent, tsOps)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private boolean getTableScanOps(
+ Operator<? extends OperatorDesc> op,
+ List<TableScanOperator> tsOps) {
+ for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+ if (!parent.supportSkewJoinOptimization()) {
+ return false;
+ }
+
+ if (parent instanceof TableScanOperator) {
+ tsOps.add((TableScanOperator)parent);
+ } else if (!getTableScanOps(parent, tsOps)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns the skewed values in all the tables which are going to be scanned.
+ * If the join is on columns c1, c2 and c3 on tables T1 and T2,
+ * T1 is skewed on c1 and c4 with the skew values ((1,2),(3,4)),
+ * whereas T2 is skewed on c1, c2 with skew values ((5,6),(7,8)), the resulting
+ * map would be: <(c1) -> ((1), (3)), (c1,c2) -> ((5,6),(7,8))>
+ * @param op The join operator being optimized
+ * @param tableScanOpsForJoin table scan operators which are parents of the join operator
+ * @return map<join keys intersection skewedkeys, list of skewed values>.
+ */
+ private Map<List<ExprNodeDesc>, List<List<String>>>
+ getSkewedValues(
+ Operator<? extends OperatorDesc> op, List<TableScanOperator> tableScanOpsForJoin) {
+
+ Map <List<ExprNodeDesc>, List<List<String>>> skewDataReturn =
+ new HashMap<List<ExprNodeDesc>, List<List<String>>>();
+
+ Map <List<ExprNodeDescEqualityWrapper>, List<List<String>>> skewData =
+ new HashMap<List<ExprNodeDescEqualityWrapper>, List<List<String>>>();
+
+ // The join keys are available in the reduceSinkOperators before join
+ for (Operator<? extends OperatorDesc> reduceSinkOp : op.getParentOperators()) {
+ ReduceSinkDesc rsDesc = ((ReduceSinkOperator) reduceSinkOp).getConf();
+
+ if (rsDesc.getKeyCols() != null) {
+ Table table = null;
+ // Find the skew information corresponding to the table
+ List<String> skewedColumns = null;
+ List<List<String>> skewedValueList = null;
+
+ // The join columns which are also skewed
+ List<ExprNodeDescEqualityWrapper> joinKeysSkewedCols =
+ new ArrayList<ExprNodeDescEqualityWrapper>();
+
+ // skewed Keys which intersect with join keys
+ List<Integer> positionSkewedKeys = new ArrayList<Integer>();
+
+ // Update the joinKeys appropriately.
+ for (ExprNodeDesc keyColDesc : rsDesc.getKeyCols()) {
+ ExprNodeColumnDesc keyCol = null;
+
+ // If the key column is not a column, then dont apply this optimization.
+ // This will be fixed as part of https://issues.apache.org/jira/browse/HIVE-3445
+ // for type conversion UDFs.
+ if (keyColDesc instanceof ExprNodeColumnDesc) {
+ keyCol = (ExprNodeColumnDesc) keyColDesc;
+ if (table == null) {
+ table = getTable(parseContext, reduceSinkOp, tableScanOpsForJoin);
+ skewedColumns =
+ table == null ? null : table.getSkewedColNames();
+ // No skew on the table to take care of
+ if ((skewedColumns == null) || (skewedColumns.isEmpty())) {
+ continue;
+ }
+
+ skewedValueList =
+ table == null ? null : table.getSkewedColValues();
+ }
+ int pos = skewedColumns.indexOf(keyCol.getColumn());
+ if ((pos >= 0) && (!positionSkewedKeys.contains(pos))) {
+ positionSkewedKeys.add(pos);
+ ExprNodeColumnDesc keyColClone = (ExprNodeColumnDesc) keyCol.clone();
+ keyColClone.setTabAlias(null);
+ joinKeysSkewedCols.add(new ExprNodeDescEqualityWrapper(keyColClone));
+ }
+ }
+ }
+
+ // If the skew keys match the join keys, then add it to the list
+ if ((skewedColumns != null) && (!skewedColumns.isEmpty())) {
+ if (!joinKeysSkewedCols.isEmpty()) {
+ // If the join keys matches the skewed keys, use the table skewed keys
+ List<List<String>> skewedJoinValues;
+ if (skewedColumns.size() == positionSkewedKeys.size()) {
+ skewedJoinValues = skewedValueList;
+ }
+ else {
+ skewedJoinValues =
+ getSkewedJoinValues(skewedValueList, positionSkewedKeys);
+ }
+
+ List<List<String>> oldSkewedJoinValues =
+ skewData.get(joinKeysSkewedCols);
+ if (oldSkewedJoinValues == null) {
+ oldSkewedJoinValues = new ArrayList<List<String>>();
+ }
+ for (List<String> skewValue : skewedJoinValues) {
+ if (!oldSkewedJoinValues.contains(skewValue)) {
+ oldSkewedJoinValues.add(skewValue);
+ }
+ }
+
+ skewData.put(joinKeysSkewedCols, oldSkewedJoinValues);
+ }
+ }
+ }
+ }
+
+ // convert skewData to contain ExprNodeDesc in the keys
+ for (Map.Entry<List<ExprNodeDescEqualityWrapper>, List<List<String>>> mapEntry :
+ skewData.entrySet()) {
+ List<ExprNodeDesc> skewedKeyJoinCols = new ArrayList<ExprNodeDesc>();
+ for (ExprNodeDescEqualityWrapper key : mapEntry.getKey()) {
+ skewedKeyJoinCols.add(key.getExprNodeDesc());
+ }
+ skewDataReturn.put(skewedKeyJoinCols, mapEntry.getValue());
+ }
+
+ return skewDataReturn;
+ }
+
+ /**
+ * Get the table alias from the candidate table scans.
+ */
+ private Table getTable(
+ ParseContext parseContext,
+ Operator<? extends OperatorDesc> op,
+ List<TableScanOperator> tableScanOpsForJoin) {
+ while (true) {
+ if (op instanceof TableScanOperator) {
+ TableScanOperator tsOp = (TableScanOperator)op;
+ if (tableScanOpsForJoin.contains(tsOp)) {
+ return parseContext.getTopToTable().get(tsOp);
+ }
+ }
+ if ((op.getParentOperators() == null) || (op.getParentOperators().size() > 1)) {
+ return null;
+ }
+ op = op.getParentOperators().get(0);
+ }
+ }
+
+ /*
+ * If the skewedValues contains ((1,2,3),(4,5,6)), and the user is looking for
+ * positions (0,2), the result would be ((1,3),(4,6))
+ * Get the skewed key values that are part of the join key.
+ * @param skewedValuesList List of all the skewed values
+ * @param positionSkewedKeys the requested positions
+ * @return sub-list of skewed values with the positions present
+ */
+ private List<List<String>> getSkewedJoinValues(
+ List<List<String>> skewedValueList, List<Integer> positionSkewedKeys) {
+ List<List<String>> skewedJoinValues = new ArrayList<List<String>>();
+ for (List<String> skewedValuesAllColumns : skewedValueList) {
+ List<String> skewedValuesSpecifiedColumns = new ArrayList<String>();
+ for (int pos : positionSkewedKeys) {
+ skewedValuesSpecifiedColumns.add(skewedValuesAllColumns.get(pos));
+ }
+ skewedJoinValues.add(skewedValuesSpecifiedColumns);
+ }
+ return skewedJoinValues;
+ }
+
+ /**
+ * Inserts a filter comparing the join keys with the skewed keys. If the table
+ * is skewed with values (k1, v1) and (k2, v2) on columns (key, value), then
+ * filter ((key=k1 AND value=v1) OR (key=k2 AND value=v2)) is inserted. If @skewed
+ * is false, a NOT is inserted before it.
+ * @param tableScanOpsForJoin table scans for which the filter will be inserted
+ * @param skewedValuesList the map of <expressions, list of skewed values>
+ * @param skewed True if we want skewedCol = skewedValue, false if we want
+ * not (skewedCol = skewedValue)
+ */
+ private void insertSkewFilter(
+ List<TableScanOperator> tableScanOpsForJoin,
+ Map<List<ExprNodeDesc>, List<List<String>>> skewedValuesList,
+ boolean skewed) {
+
+ ExprNodeDesc filterExpr = constructFilterExpr(skewedValuesList, skewed);
+ for (TableScanOperator tableScanOp : tableScanOpsForJoin) {
+ insertFilterOnTop(tableScanOp, filterExpr);
+ }
+ }
+
+ /**
+ * Inserts a filter below the table scan operator. Construct the filter
+ * from the filter expression provided.
+ * @param tableScanOp the table scan operators
+ * @param filterExpr the filter expression
+ */
+ private void insertFilterOnTop(
+ TableScanOperator tableScanOp,
+ ExprNodeDesc filterExpr) {
+
+ // Get the top operator and it's child, all operators have a single parent
+ Operator<? extends OperatorDesc> currChild = tableScanOp.getChildOperators().get(0);
+
+ // Create the filter Operator and update the parents and children appropriately
+ tableScanOp.setChildOperators(null);
+ currChild.setParentOperators(null);
+
+ Operator<FilterDesc> filter = OperatorFactory.getAndMakeChild(
+ new FilterDesc(filterExpr, false), tableScanOp);
+ filter.setSchema(new RowSchema(tableScanOp.getSchema().getSignature()));
+ OperatorFactory.makeChild(filter, currChild);
+
+ RowResolver filterRR = parseContext.getOpParseCtx().get(tableScanOp).getRowResolver();
+ GenMapRedUtils.putOpInsertMap(filter, filterRR, parseContext);
+ }
+
+ /**
+ * Construct the filter expression from the skewed keys and skewed values.
+ * If the skewed join keys are (k1), and (k1,k3) with the skewed values
+ * (1,2) and ((2,3),(4,5)) respectively, the filter expression would be:
+ * (k1=1) or (k1=2) or ((k1=2) and (k3=3)) or ((k1=4) and (k3=5)).
+ */
+ private ExprNodeDesc constructFilterExpr(
+ Map<List<ExprNodeDesc>, List<List<String>>> skewedValuesMap,
+ boolean skewed) {
+
+ ExprNodeDesc finalExprNodeDesc = null;
+ try {
+ for (Map.Entry<List<ExprNodeDesc>, List<List<String>>> mapEntry :
+ skewedValuesMap.entrySet()) {
+ List<ExprNodeDesc> keyCols = mapEntry.getKey();
+ List<List<String>> skewedValuesList = mapEntry.getValue();
+
+ for (List<String> skewedValues : skewedValuesList) {
+ int keyPos = 0;
+ ExprNodeDesc currExprNodeDesc = null;
+
+ // Make the following condition: all the values match for all the columns
+ for (String skewedValue : skewedValues) {
+ List<ExprNodeDesc> children = new ArrayList<ExprNodeDesc>();
+
+ // We have ensured that the keys are columns
+ ExprNodeColumnDesc keyCol = (ExprNodeColumnDesc) keyCols.get(keyPos).clone();
+ keyPos++;
+ children.add(keyCol);
+
+ // Convert the constants available as strings to the corresponding objects
+ children.add(createConstDesc(skewedValue, keyCol));
+
+ ExprNodeGenericFuncDesc expr = null;
+ // Create the equality condition
+ expr = ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPEqual(), children);
+ if (currExprNodeDesc == null) {
+ currExprNodeDesc = expr;
+ } else {
+ // If there are previous nodes, then AND the current node with the previous one
+ List<ExprNodeDesc> childrenAND = new ArrayList<ExprNodeDesc>();
+ childrenAND.add(currExprNodeDesc);
+ childrenAND.add(expr);
+ currExprNodeDesc =
+ ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPAnd(), childrenAND);
+ }
+ }
+
+ // If there are more than one skewed values,
+ // then OR the current node with the previous one
+ if (finalExprNodeDesc == null) {
+ finalExprNodeDesc = currExprNodeDesc;
+ } else {
+ List<ExprNodeDesc> childrenOR = new ArrayList<ExprNodeDesc>();
+ childrenOR.add(finalExprNodeDesc);
+ childrenOR.add(currExprNodeDesc);
+
+ finalExprNodeDesc =
+ ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPOr(), childrenOR);
+ }
+ }
+ }
+
+ // Add a NOT operator in the beginning (this is for the cloned operator because we
+ // want the values which are not skewed
+ if (skewed == false) {
+ List<ExprNodeDesc> childrenNOT = new ArrayList<ExprNodeDesc>();
+ childrenNOT.add(finalExprNodeDesc);
+ finalExprNodeDesc =
+ ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPNot(), childrenNOT);
+ }
+ } catch (UDFArgumentException e) {
+ // Ignore the exception because we are not comparing Long vs. String here.
+ // There should never be an exception
+ assert false;
+ }
+ return finalExprNodeDesc;
+ }
+
+ /**
+ * Converts the skewedValue available as a string in the metadata to the appropriate object
+ * by using the type of the column from the join key.
+ * @param skewedValue
+ * @param keyCol
+ * @return an expression node descriptor of the appropriate constant
+ */
+ private ExprNodeConstantDesc createConstDesc(
+ String skewedValue, ExprNodeColumnDesc keyCol) {
+ ObjectInspector inputOI = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+ TypeInfoFactory.stringTypeInfo);
+ ObjectInspector outputOI = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+ keyCol.getTypeInfo());
+ Converter converter = ObjectInspectorConverters.getConverter(inputOI, outputOI);
+ Object skewedValueObject = converter.convert(skewedValue);
+ return new ExprNodeConstantDesc(keyCol.getTypeInfo(), skewedValueObject);
+ }
+
+ private Map<String, Operator<? extends OperatorDesc>> getTopOps(
+ Operator<? extends OperatorDesc> op) {
+ Map<String, Operator<? extends OperatorDesc>> topOps =
+ new HashMap<String, Operator<? extends OperatorDesc>>();
+ if (op.getParentOperators() == null || op.getParentOperators().size() == 0) {
+ topOps.put(((TableScanOperator)op).getConf().getAlias(), op);
+ } else {
+ for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+ if (parent != null) {
+ topOps.putAll(getTopOps(parent));
+ }
+ }
+ }
+ return topOps;
+ }
+
+ private void insertRowResolvers(
+ Operator<? extends OperatorDesc> op,
+ Operator<? extends OperatorDesc> opClone,
+ SkewJoinOptProcCtx ctx) {
+
+ if (op instanceof TableScanOperator) {
+ ctx.getCloneTSOpMap().put((TableScanOperator)opClone, (TableScanOperator)op);
+ }
+
+ GenMapRedUtils.putOpInsertMap(
+ opClone, parseContext.getOpParseCtx().get(op).getRowResolver(), parseContext);
+
+ List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
+ List<Operator<? extends OperatorDesc>> parentClones = opClone.getParentOperators();
+ if ((parents != null) && (!parents.isEmpty()) &&
+ (parentClones != null) && (!parentClones.isEmpty())) {
+ for (int pos = 0; pos < parents.size(); pos++) {
+ insertRowResolvers(parents.get(pos), parentClones.get(pos), ctx);
+ }
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.hadoop.hive.ql.optimizer.Transform#transform
+ * (org.apache.hadoop.hive.ql.parse.ParseContext)
+ */
+ @Override
+ public ParseContext transform(ParseContext pctx) throws SemanticException {
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+
+ opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc());
+ SkewJoinOptProcCtx skewJoinOptProcCtx = new SkewJoinOptProcCtx(pctx);
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(
+ null, opRules, skewJoinOptProcCtx);
+ GraphWalker ogw = new DefaultGraphWalker(disp);
+
+ // Create a list of topop nodes
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pctx.getTopOps().values());
+ ogw.startWalking(topNodes, null);
+ return pctx;
+ }
+
+ private NodeProcessor getSkewJoinProc() {
+ return new SkewJoinProc();
+ }
+
+ /**
+ * SkewJoinOptProcCtx.
+ *
+ */
+ public static class SkewJoinOptProcCtx implements NodeProcessorCtx {
+
+ private ParseContext pGraphContext;
+
+ // set of joins already processed
+ private Set<JoinOperator> doneJoins;
+ private Map<TableScanOperator, TableScanOperator> cloneTSOpMap;
+
+ public SkewJoinOptProcCtx(ParseContext pctx) {
+ this.pGraphContext = pctx;
+ doneJoins = new HashSet<JoinOperator>();
+ cloneTSOpMap = new HashMap<TableScanOperator, TableScanOperator>();
+ }
+
+ public ParseContext getpGraphContext() {
+ return pGraphContext;
+ }
+
+ public void setPGraphContext(ParseContext graphContext) {
+ pGraphContext = graphContext;
+ }
+
+ public Set<JoinOperator> getDoneJoins() {
+ return doneJoins;
+ }
+
+ public void setDoneJoins(Set<JoinOperator> doneJoins) {
+ this.doneJoins = doneJoins;
+ }
+
+ public Map<TableScanOperator, TableScanOperator> getCloneTSOpMap() {
+ return cloneTSOpMap;
+ }
+
+ public void setCloneTSOpMap(Map<TableScanOperator, TableScanOperator> cloneTSOpMap) {
+ this.cloneTSOpMap = cloneTSOpMap;
+ }
+ }
+}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Sep 18 05:53:35 2012
@@ -8356,7 +8356,7 @@ public class SemanticAnalyzer extends Ba
/**
* This code is commented out pending further testing/development
- * for (Task<? extends SerializableCloneable> t: rootTasks)
+ * for (Task<? extends OperatorDesc> t: rootTasks)
* t.localizeMRTmpFiles(ctx);
*/
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java Tue Sep 18 05:53:35 2012
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
@@ -156,4 +157,14 @@ public class ExprNodeColumnDesc extends
public void setSkewedCol(boolean isSkewedCol) {
this.isSkewedCol = isSkewedCol;
}
+
+ @Override
+ public int hashCode() {
+ int superHashCode = super.hashCode();
+ HashCodeBuilder builder = new HashCodeBuilder();
+ builder.appendSuper(superHashCode);
+ builder.append(column);
+ builder.append(tabAlias);
+ return builder.toHashCode();
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java Tue Sep 18 05:53:35 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
+import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.serde.Constants;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
@@ -108,4 +109,13 @@ public class ExprNodeConstantDesc extend
return true;
}
+
+ @Override
+ public int hashCode() {
+ int superHashCode = super.hashCode();
+ HashCodeBuilder builder = new HashCodeBuilder();
+ builder.appendSuper(superHashCode);
+ builder.append(value);
+ return builder.toHashCode();
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java Tue Sep 18 05:53:35 2012
@@ -53,6 +53,11 @@ public abstract class ExprNodeDesc imple
// object equality - isSame means that the objects are semantically equal.
public abstract boolean isSame(Object o);
+ @Override
+ public int hashCode() {
+ return typeInfo.hashCode();
+ }
+
public TypeInfo getTypeInfo() {
return typeInfo;
}
@@ -116,5 +121,10 @@ public abstract class ExprNodeDesc imple
return this.exprNodeDesc.isSame(((ExprNodeDescEqualityWrapper)other).getExprNodeDesc());
}
+
+ @Override
+ public int hashCode() {
+ return exprNodeDesc == null ? 0 : exprNodeDesc.hashCode();
+ }
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java Tue Sep 18 05:53:35 2012
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -122,4 +123,15 @@ public class ExprNodeFieldDesc extends E
return true;
}
+
+ @Override
+ public int hashCode() {
+ int superHashCode = super.hashCode();
+ HashCodeBuilder builder = new HashCodeBuilder();
+ builder.appendSuper(superHashCode);
+ builder.append(desc);
+ builder.append(fieldName);
+ builder.append(isList);
+ return builder.toHashCode();
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java Tue Sep 18 05:53:35 2012
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -268,6 +269,15 @@ public class ExprNodeGenericFuncDesc ext
return true;
}
+ @Override
+ public int hashCode() {
+ int superHashCode = super.hashCode();
+ HashCodeBuilder builder = new HashCodeBuilder();
+ builder.appendSuper(superHashCode);
+ builder.append(childExprs);
+ return builder.toHashCode();
+ }
+
public boolean isSortedExpr() {
return isSortedExpr;
}
@@ -275,5 +285,4 @@ public class ExprNodeGenericFuncDesc ext
public void setSortedExpr(boolean isSortedExpr) {
this.isSortedExpr = isSortedExpr;
}
-
}
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,38 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- a simple join query with skew on both the tables on the join key
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+-- test outer joins also
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+-- an aggregation at the end should not change anything
+
+EXPLAIN
+SELECT count(1) FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT count(1) FROM T1 a JOIN T2 b ON a.key = b.key;
+
+EXPLAIN
+SELECT count(1) FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT count(1) FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,17 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, value STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+drop table array_valued_T1;
+create table array_valued_T1 (key string, value array<string>) SKEWED BY (key) ON ((8));
+insert overwrite table array_valued_T1 select key, array(value) from T1;
+
+-- This test is to verify the skew join compile optimization when the join is followed by a lateral view
+explain
+select * from (select a.key as key, b.value as array_val from T1 a join array_valued_T1 b on a.key=b.key) i lateral view explode (array_val) c as val;
+
+select * from (select a.key as key, b.value as array_val from T1 a join array_valued_T1 b on a.key=b.key) i lateral view explode (array_val) c as val;
+
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,29 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- This test is to verify the skew join compile optimization when the join is followed
+-- by a union. Both sides of a union consist of a join, which should have used
+-- skew join compile time optimization.
+EXPLAIN
+select * from
+(
+ select a.key, a.val as val1, b.val as val2 from T1 a join T2 b on a.key = b.key
+ union all
+ select a.key, a.val as val1, b.val as val2 from T1 a join T2 b on a.key = b.key
+) subq1;
+
+select * from
+(
+ select a.key, a.val as val1, b.val as val2 from T1 a join T2 b on a.key = b.key
+ union all
+ select a.key, a.val as val1, b.val as val2 from T1 a join T2 b on a.key = b.key
+) subq1;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12), (8, 18)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key, val) ON ((3, 13), (8, 18)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- Both the join tables are skewed by 2 keys, and one of the skewed values
+-- is common to both the tables. The join key matches the skewed key set.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,33 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+CREATE TABLE T3(key STRING, val STRING)
+SKEWED BY (val) ON ((12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+
+-- This test is for skewed join compile time optimization for more than 2 tables.
+-- The join key for table 3 is different from the join key used for joining
+-- tables 1 and 2. Table 3 is skewed, but since one of the join sources for table
+-- 3 consist of a sub-query which contains a join, the compile time skew join
+-- optimization is not performed
+
+EXPLAIN
+select *
+from
+T1 a join T2 b on a.key = b.key
+join T3 c on a.val = c.val;
+
+select *
+from
+T1 a join T2 b on a.key = b.key
+join T3 c on a.val = c.val;
+
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,34 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+CREATE TABLE T3(key STRING, val STRING)
+SKEWED BY (val) ON ((12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+
+-- This test is for skewed join compile time optimization for more than 2 tables.
+-- The join key for table 3 is different from the join key used for joining
+-- tables 1 and 2. Tables 1 and 3 are skewed. Since one of the join sources for table
+-- 3 consist of a sub-query which contains a join, the compile time skew join
+-- optimization is not enabled for table 3, but it is used for the first join between
+-- tables 1 and 2
+EXPLAIN
+select *
+from
+T1 a join T2 b on a.key = b.key
+join T3 c on a.val = c.val;
+
+select *
+from
+T1 a join T2 b on a.key = b.key
+join T3 c on a.val = c.val;
+
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,46 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE tmpT1(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE tmpT1;
+
+-- testing skew on other data types - int
+CREATE TABLE T1(key INT, val STRING) SKEWED BY (key) ON ((2));
+INSERT OVERWRITE TABLE T1 SELECT key, val FROM tmpT1;
+
+CREATE TABLE tmpT2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE tmpT2;
+
+CREATE TABLE T2(key INT, val STRING) SKEWED BY (key) ON ((3));
+
+INSERT OVERWRITE TABLE T2 SELECT key, val FROM tmpT2;
+
+-- The skewed key is a integer column.
+-- Otherwise this test is similar to skewjoinopt1.q
+-- Both the joined tables are skewed, and the joined column
+-- is an integer
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+-- test outer joins also
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+-- an aggregation at the end should not change anything
+
+EXPLAIN
+SELECT count(1) FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT count(1) FROM T1 a JOIN T2 b ON a.key = b.key;
+
+EXPLAIN
+SELECT count(1) FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT count(1) FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- One of the tables is skewed by 2 columns, and the other table is
+-- skewed by one column. Ths join is performed on the both the columns
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,45 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- One of the tables is skewed by 2 columns, and the other table is
+-- skewed by one column. Ths join is performed on the first skewed column
+-- The skewed value for the jon key is common to both the tables.
+-- In this case, the skewed join value is not repeated in the filter.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+DROP TABLE T1;
+DROP TABLE T2;
+
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- One of the tables is skewed by 2 columns, and the other table is
+-- skewed by one column. Ths join is performed on the both the columns
+-- In this case, the skewed join value is repeated in the filter.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,26 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE tmpT1(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE tmpT1;
+
+-- testing skew on other data types - int
+CREATE TABLE T1(key INT, val STRING) SKEWED BY (key) ON ((2));
+INSERT OVERWRITE TABLE T1 SELECT key, val FROM tmpT1;
+
+-- Tke skewed column is same in both the tables, however it is
+-- INT in one of the tables, and STRING in the other table
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- Once HIVE-3445 is fixed, the compile time skew join optimization would be
+-- applicable here. Till the above jira is fixed, it would be performed as a
+-- regular join
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) INTO 4 BUCKETS
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- add a test where the skewed key is also the bucketized key
+-- it should not matter, and the compile time skewed join
+-- optimization is performed
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,41 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2), (7)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- a simple query with skew on both the tables on the join key
+-- multiple skew values are present for the skewed keys
+-- but the skewed values do not overlap.
+-- The join values are a superset of the skewed keys.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+-- test outer joins also
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a LEFT OUTER JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a LEFT OUTER JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+-- a group by at the end should not change anything
+
+EXPLAIN
+SELECT a.key, count(1) FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val group by a.key;
+
+SELECT a.key, count(1) FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val group by a.key;
+
+EXPLAIN
+SELECT a.key, count(1) FROM T1 a LEFT OUTER JOIN T2 b ON a.key = b.key and a.val = b.val group by a.key;
+
+SELECT a.key, count(1) FROM T1 a LEFT OUTER JOIN T2 b ON a.key = b.key and a.val = b.val group by a.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- add a test where the skewed key is also the bucketized/sorted key
+-- it should not matter, and the compile time skewed join
+-- optimization is performed
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,28 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- a simple query with skew on both the tables. One of the skewed
+-- value is common to both the tables. The skewed value should not be
+-- repeated in the filter.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+-- test outer joins also
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a FULL OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a FULL OUTER JOIN T2 b ON a.key = b.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,25 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- only of the tables of the join (the left table of the join) is skewed
+-- the skewed filter would still be applied to both the tables
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+-- the order of the join should not matter, just confirming
+EXPLAIN
+SELECT a.*, b.* FROM T2 a JOIN T1 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T2 a JOIN T1 b ON a.key = b.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- One of the tables is skewed by 2 columns, and the other table is
+-- skewed by one column. Ths join is performed on the first skewed column
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,21 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12), (8, 18)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key, val) ON ((3, 13), (8, 18)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- Both the join tables are skewed by 2 keys, and one of the skewed values
+-- is common to both the tables. The join key is a subset of the skewed key set:
+-- it only contains the first skewed key for both the tables
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,24 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+CREATE TABLE T3(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+
+-- This test is for validating skewed join compile time optimization for more than
+-- 2 tables. The join key is the same, and so a 3-way join would be performed.
+-- 2 of the 3 tables are skewed on the join key
+EXPLAIN
+SELECT a.*, b.*, c.* FROM T1 a JOIN T2 b ON a.key = b.key JOIN T3 c on a.key = c.key;
+
+SELECT a.*, b.*, c.* FROM T1 a JOIN T2 b ON a.key = b.key JOIN T3 c on a.key = c.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,23 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+CREATE TABLE T3(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+
+-- This test is for validating skewed join compile time optimization for more than
+-- 2 tables. The join key is the same, and so a 3-way join would be performed.
+-- 1 of the 3 tables are skewed on the join key
+EXPLAIN
+SELECT a.*, b.*, c.* FROM T1 a JOIN T2 b ON a.key = b.key JOIN T3 c on a.key = c.key;
+
+SELECT a.*, b.*, c.* FROM T1 a JOIN T2 b ON a.key = b.key JOIN T3 c on a.key = c.key;
Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,45 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- no skew join compile time optimization would be performed if one of the
+-- join sources is a sub-query consisting of a union all
+EXPLAIN
+select * from
+(
+select key, val from T1
+ union all
+select key, val from T1
+) subq1
+join T2 b on subq1.key = b.key;
+
+select * from
+(
+select key, val from T1
+ union all
+select key, val from T1
+) subq1
+join T2 b on subq1.key = b.key;
+
+-- no skew join compile time optimization would be performed if one of the
+-- join sources is a sub-query consisting of a group by
+EXPLAIN
+select * from
+(
+select key, count(1) as cnt from T1 group by key
+) subq1
+join T2 b on subq1.key = b.key;
+
+select * from
+(
+select key, count(1) as cnt from T1 group by key
+) subq1
+join T2 b on subq1.key = b.key;