You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ke...@apache.org on 2012/09/18 07:53:37 UTC

svn commit: r1386996 [1/5] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/parse/ ql/src/java/org/ap...

Author: kevinwilfong
Date: Tue Sep 18 05:53:35 2012
New Revision: 1386996

URL: http://svn.apache.org/viewvc?rev=1386996&view=rev
Log:
HIVE-3086. Skewed Join Optimization. njain via kevinwilfong

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q
    hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt1.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt10.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt11.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt12.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt13.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt14.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt15.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt16.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt17.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt18.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt19.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt2.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt20.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt3.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt4.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt5.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt6.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt7.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt8.q.out
    hive/trunk/ql/src/test/results/clientpositive/skewjoinopt9.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Sep 18 05:53:35 2012
@@ -495,6 +495,9 @@ public class HiveConf extends Configurat
     HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
     HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
 
+    // optimize skewed join by changing the query plan at compile time
+    HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME("hive.optimize.skewjoin.compiletime", false),
+
     // Indexes
     HIVEOPTINDEXFILTER_COMPACT_MINSIZE("hive.optimize.index.filter.compact.minsize", (long) 5 * 1024 * 1024 * 1024), // 5G
     HIVEOPTINDEXFILTER_COMPACT_MAXSIZE("hive.optimize.index.filter.compact.maxsize", (long) -1), // infinity

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Tue Sep 18 05:53:35 2012
@@ -426,6 +426,27 @@
 </property>
 
 <property>
+  <name>hive.optimize.skewjoin.compiletime</name>
+  <value>false</value>
+  <description>Whether to create a separate plan for skewed keys for the tables in the join.
+    This is based on the skewed keys stored in the metadata. At compile time, the plan is broken
+    into different joins: one for the skewed keys, and the other for the remaining keys. And then,
+    a union is performed for the 2 joins generated above. So unless the same skewed key is present
+    in both the joined tables, the join for the skewed key will be performed as a map-side join.
+
+    The main difference between this paramater and hive.optimize.skewjoin is that this parameter
+    uses the skew information stored in the metastore to optimize the plan at compile time itself.
+    If there is no skew information in the metadata, this parameter will not have any affect. 
+    Both hive.optimize.skewjoin.compiletime and hive.optimize.skewjoin should be set to true.
+    Ideally, hive.optimize.skewjoin should be renamed as hive.optimize.skewjoin.runtime, but not doing
+    so for backward compatibility.
+
+    If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime
+    would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op.
+  </description>
+</property>
+
+<property>
   <name>hive.multigroupby.singlemr</name>
   <value>false</value>
   <description>Whether to optimize multi group by query to generate single M/R
@@ -459,7 +480,13 @@
 <property>
   <name>hive.optimize.skewjoin</name>
   <value>false</value>
-  <description>Whether to enable skew join optimization. </description>
+  <description>Whether to enable skew join optimization.
+    The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of
+    processing those keys, store them temporarily in a hdfs directory. In a follow-up map-reduce
+    job, process those skewed keys. The same key need not be skewed for all the tables, and so,
+    the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a 
+    map-join.
+</description>
 </property>
 
 <property>

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Tue Sep 18 05:53:35 2012
@@ -160,4 +160,9 @@ public class FilterOperator extends Oper
   public OperatorType getType() {
     return OperatorType.FILTER;
   }
+
+  @Override
+  public boolean supportSkewJoinOptimization() {
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Tue Sep 18 05:53:35 2012
@@ -266,4 +266,11 @@ public class JoinOperator extends Common
     }
   }
 
+  @Override
+  public boolean supportSkewJoinOptimization() {
+    // Since skew join optimization makes a copy of the tree above joins, and
+    // there is no multi-query optimization in place, let us not use skew join
+    // optimizations for now.
+    return false;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Sep 18 05:53:35 2012
@@ -1338,6 +1338,10 @@ public abstract class Operator<T extends
   public void cleanUpInputFileChangedOp() throws HiveException {
   }
 
+  public boolean supportSkewJoinOptimization() {
+    return false;
+  }
+
   @Override
   public Operator<? extends OperatorDesc> clone()
     throws CloneNotSupportedException {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Sep 18 05:53:35 2012
@@ -39,9 +39,9 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Tue Sep 18 05:53:35 2012
@@ -100,4 +100,9 @@ public class SelectOperator extends Oper
   public OperatorType getType() {
     return OperatorType.SELECT;
   }
+
+  @Override
+  public boolean supportSkewJoinOptimization() {
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Sep 18 05:53:35 2012
@@ -278,4 +278,9 @@ public class TableScanOperator extends O
       }
     }
   }
+
+  @Override
+  public boolean supportSkewJoinOptimization() {
+    return true;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Tue Sep 18 05:53:35 2012
@@ -55,6 +55,9 @@ public class Optimizer {
       transformations.add(new PartitionPruner());
       transformations.add(new PartitionConditionRemover());
     }
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
+      transformations.add(new SkewJoinOptimizer());
+    }
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTGBYUSINGINDEX)) {
       transformations.add(new RewriteGBUsingIndex());
     }
@@ -88,7 +91,7 @@ public class Optimizer {
    */
   public ParseContext optimize() throws SemanticException {
     for (Transform t : transformations) {
-      pctx = t.transform(pctx);
+        pctx = t.transform(pctx);
     }
     return pctx;
   }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SkewJoinOptimizer.java Tue Sep 18 05:53:35 2012
@@ -0,0 +1,681 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc.ExprNodeDescEqualityWrapper;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.UnionDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNot;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+/**
+ * SkewJoinOptimizer.
+ *
+ */
+public class SkewJoinOptimizer implements Transform {
+
+  private static final Log LOG = LogFactory.getLog(SkewJoinOptimizer.class.getName());
+  private static ParseContext parseContext;
+
+  public static class SkewJoinProc implements NodeProcessor {
+    public SkewJoinProc() {
+      super();
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+      Object... nodeOutputs) throws SemanticException {
+      // We should be having a tree which looks like this
+      //  TS -> * -> RS -
+      //                  \
+      //                   -> JOIN -> ..
+      //                  /
+      //  TS -> * -> RS -
+      //
+      // We are in the join operator now.
+
+      SkewJoinOptProcCtx ctx = (SkewJoinOptProcCtx) procCtx;
+      parseContext = ctx.getpGraphContext();
+
+      JoinOperator joinOp = (JoinOperator)nd;
+      // This join has already been processed
+      if (ctx.getDoneJoins().contains(joinOp)) {
+        return null;
+      }
+
+      ctx.getDoneJoins().add(joinOp);
+
+      Operator<? extends OperatorDesc> currOp = joinOp;
+      boolean processSelect = false;
+
+      // Is there a select following
+      // Clone the select also. It is useful for a follow-on optimization where the union
+      // followed by a select star is completely removed.
+      if ((joinOp.getChildOperators().size() == 1) &&
+          (joinOp.getChildOperators().get(0) instanceof SelectOperator)) {
+        currOp = joinOp.getChildOperators().get(0);
+        processSelect = true;
+      }
+
+      List<TableScanOperator> tableScanOpsForJoin = new ArrayList<TableScanOperator>();
+      if (!getTableScanOpsForJoin(joinOp, tableScanOpsForJoin)) {
+        return null;
+      }
+
+      if ((tableScanOpsForJoin == null) || (tableScanOpsForJoin.isEmpty())) {
+        return null;
+      }
+
+      // Get the skewed values in all the tables
+      Map<List<ExprNodeDesc>, List<List<String>>> skewedValues =
+        getSkewedValues(joinOp, tableScanOpsForJoin);
+
+      // If there are no skewed values, nothing needs to be done
+      if (skewedValues == null || skewedValues.size() == 0) {
+        return null;
+      }
+
+      // After this optimization, the tree should be like:
+      //  TS -> (FIL "skewed rows") * -> RS -
+      //                                     \
+      //                                       ->   JOIN
+      //                                     /           \
+      //  TS -> (FIL "skewed rows") * -> RS -             \
+      //                                                   \
+      //                                                     ->  UNION -> ..
+      //                                                   /
+      //  TS -> (FIL "no skewed rows") * -> RS -          /
+      //                                        \        /
+      //                                         -> JOIN
+      //                                        /
+      //  TS -> (FIL "no skewed rows") * -> RS -
+      //
+
+      // Create a clone of the operator
+      Operator<? extends OperatorDesc> currOpClone;
+      try {
+        currOpClone = currOp.clone();
+        insertRowResolvers(currOp, currOpClone, ctx);
+      } catch (CloneNotSupportedException e) {
+        LOG.debug("Operator tree could not be cloned");
+        return null;
+      }
+
+      JoinOperator joinOpClone;
+      if (processSelect) {
+        joinOpClone = (JoinOperator)(currOpClone.getParentOperators().get(0));
+      } else {
+        joinOpClone = (JoinOperator)currOpClone;
+      }
+
+      // Put the filter "skewed column = skewed keys" in op
+      // and "skewed columns != skewed keys" in selectOpClone
+      insertSkewFilter(tableScanOpsForJoin, skewedValues, true);
+
+      List<TableScanOperator> tableScanCloneOpsForJoin =
+        new ArrayList<TableScanOperator>();
+      assert
+        getTableScanOpsForJoin(joinOpClone, tableScanCloneOpsForJoin);
+
+      insertSkewFilter(tableScanCloneOpsForJoin, skewedValues, false);
+
+      // Update the topOps appropriately
+      Map<String, Operator<? extends OperatorDesc>> topOps = getTopOps(joinOpClone);
+      Map<String, Operator<? extends OperatorDesc>> origTopOps = parseContext.getTopOps();
+
+      for (Entry<String, Operator<? extends OperatorDesc>> topOp : topOps.entrySet()) {
+        TableScanOperator tso = (TableScanOperator) topOp.getValue();
+        Table origTable = parseContext.getTopToTable().get(ctx.getCloneTSOpMap().get(tso));
+        String tabAlias = tso.getConf().getAlias();
+        parseContext.getTopToTable().put(tso, origTable);
+        int initCnt = 1;
+        String newAlias = "subquery" + initCnt + ":" + tabAlias;
+        while (origTopOps.containsKey(newAlias)) {
+          initCnt++;
+          newAlias = "subquery" + initCnt + ":" + tabAlias;
+        }
+
+        parseContext.getTopOps().put(newAlias, tso);
+      }
+
+      // Now do a union of the select operators: selectOp and selectOpClone
+      // Store the operator that follows the select after the join, we will be
+      // adding this as a child to the Union later
+      List<Operator<? extends OperatorDesc>> finalOps = currOp.getChildOperators();
+      currOp.setChildOperators(null);
+      currOpClone.setChildOperators(null);
+
+      // Make the union operator
+      List<Operator<? extends OperatorDesc>> oplist =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+      oplist.add(currOp);
+      oplist.add(currOpClone);
+      Operator<? extends OperatorDesc> unionOp =
+        OperatorFactory.getAndMakeChild(
+          new UnionDesc(), new RowSchema(currOp.getSchema().getSignature()), oplist);
+
+      RowResolver unionRR = parseContext.getOpParseCtx().get(currOp).getRowResolver();
+      GenMapRedUtils.putOpInsertMap(unionOp, unionRR, parseContext);
+
+      // Introduce a select after the union
+      List<Operator<? extends OperatorDesc>> unionList =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+      unionList.add(unionOp);
+
+      Operator<? extends OperatorDesc> selectUnionOp =
+        OperatorFactory.getAndMakeChild(
+          new SelectDesc(true),
+          new RowSchema(unionOp.getSchema().getSignature()), unionList);
+      GenMapRedUtils.putOpInsertMap(selectUnionOp, unionRR, parseContext);
+
+      // add the finalOp after the union
+      selectUnionOp.setChildOperators(finalOps);
+      // replace the original selectOp in the parents with selectUnionOp
+      for (Operator<? extends OperatorDesc> finalOp : finalOps) {
+        finalOp.replaceParent(currOp, selectUnionOp);
+      }
+      return null;
+    }
+
+    /*
+     * Get the list of table scan operators for this join. A interface supportSkewJoinOptimization
+     * has been provided. Currently, it is only enabled for simple filters and selects.
+     */
+    private boolean getTableScanOpsForJoin(
+      JoinOperator op,
+      List<TableScanOperator> tsOps) {
+
+      for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+        if (!getTableScanOps(parent, tsOps)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    private boolean getTableScanOps(
+      Operator<? extends OperatorDesc> op,
+      List<TableScanOperator> tsOps) {
+      for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+        if (!parent.supportSkewJoinOptimization()) {
+          return false;
+        }
+
+        if (parent instanceof TableScanOperator) {
+          tsOps.add((TableScanOperator)parent);
+        } else if (!getTableScanOps(parent, tsOps)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Returns the skewed values in all the tables which are going to be scanned.
+     * If the join is on columns c1, c2 and c3 on tables T1 and T2,
+     * T1 is skewed on c1 and c4 with the skew values ((1,2),(3,4)),
+     * whereas T2 is skewed on c1, c2 with skew values ((5,6),(7,8)), the resulting
+     * map would be: <(c1) -> ((1), (3)), (c1,c2) -> ((5,6),(7,8))>
+     * @param op The join operator being optimized
+     * @param tableScanOpsForJoin table scan operators which are parents of the join operator
+     * @return map<join keys intersection skewedkeys, list of skewed values>.
+     */
+    private Map<List<ExprNodeDesc>, List<List<String>>>
+      getSkewedValues(
+        Operator<? extends OperatorDesc> op, List<TableScanOperator> tableScanOpsForJoin) {
+
+      Map <List<ExprNodeDesc>, List<List<String>>> skewDataReturn =
+        new HashMap<List<ExprNodeDesc>, List<List<String>>>();
+
+      Map <List<ExprNodeDescEqualityWrapper>, List<List<String>>> skewData =
+          new HashMap<List<ExprNodeDescEqualityWrapper>, List<List<String>>>();
+
+      // The join keys are available in the reduceSinkOperators before join
+      for (Operator<? extends OperatorDesc> reduceSinkOp : op.getParentOperators()) {
+        ReduceSinkDesc rsDesc = ((ReduceSinkOperator) reduceSinkOp).getConf();
+
+        if (rsDesc.getKeyCols() != null) {
+          Table table = null;
+          // Find the skew information corresponding to the table
+          List<String> skewedColumns = null;
+          List<List<String>> skewedValueList = null;
+
+          // The join columns which are also skewed
+          List<ExprNodeDescEqualityWrapper> joinKeysSkewedCols =
+            new ArrayList<ExprNodeDescEqualityWrapper>();
+
+          // skewed Keys which intersect with join keys
+          List<Integer> positionSkewedKeys = new ArrayList<Integer>();
+
+          // Update the joinKeys appropriately.
+          for (ExprNodeDesc keyColDesc : rsDesc.getKeyCols()) {
+            ExprNodeColumnDesc keyCol = null;
+
+            // If the key column is not a column, then dont apply this optimization.
+            // This will be fixed as part of https://issues.apache.org/jira/browse/HIVE-3445
+            // for type conversion UDFs.
+            if (keyColDesc instanceof ExprNodeColumnDesc) {
+              keyCol = (ExprNodeColumnDesc) keyColDesc;
+              if (table == null) {
+                table = getTable(parseContext, reduceSinkOp, tableScanOpsForJoin);
+                skewedColumns =
+                  table == null ? null : table.getSkewedColNames();
+                // No skew on the table to take care of
+                if ((skewedColumns == null) || (skewedColumns.isEmpty())) {
+                  continue;
+                }
+
+                skewedValueList =
+                  table == null ? null : table.getSkewedColValues();
+              }
+              int pos = skewedColumns.indexOf(keyCol.getColumn());
+              if ((pos >= 0) && (!positionSkewedKeys.contains(pos))) {
+                positionSkewedKeys.add(pos);
+                ExprNodeColumnDesc keyColClone = (ExprNodeColumnDesc) keyCol.clone();
+                keyColClone.setTabAlias(null);
+                joinKeysSkewedCols.add(new ExprNodeDescEqualityWrapper(keyColClone));
+              }
+            }
+          }
+
+          // If the skew keys match the join keys, then add it to the list
+          if ((skewedColumns != null) && (!skewedColumns.isEmpty())) {
+            if (!joinKeysSkewedCols.isEmpty()) {
+              // If the join keys matches the skewed keys, use the table skewed keys
+              List<List<String>> skewedJoinValues;
+              if (skewedColumns.size() == positionSkewedKeys.size()) {
+                skewedJoinValues = skewedValueList;
+              }
+              else {
+                skewedJoinValues =
+                  getSkewedJoinValues(skewedValueList, positionSkewedKeys);
+              }
+
+              List<List<String>> oldSkewedJoinValues =
+                skewData.get(joinKeysSkewedCols);
+              if (oldSkewedJoinValues == null) {
+                oldSkewedJoinValues = new ArrayList<List<String>>();
+              }
+              for (List<String> skewValue : skewedJoinValues) {
+                if (!oldSkewedJoinValues.contains(skewValue)) {
+                  oldSkewedJoinValues.add(skewValue);
+                }
+              }
+
+              skewData.put(joinKeysSkewedCols, oldSkewedJoinValues);
+            }
+          }
+        }
+      }
+
+      // convert skewData to contain ExprNodeDesc in the keys
+      for (Map.Entry<List<ExprNodeDescEqualityWrapper>, List<List<String>>> mapEntry :
+        skewData.entrySet()) {
+          List<ExprNodeDesc> skewedKeyJoinCols = new ArrayList<ExprNodeDesc>();
+          for (ExprNodeDescEqualityWrapper key : mapEntry.getKey()) {
+            skewedKeyJoinCols.add(key.getExprNodeDesc());
+          }
+          skewDataReturn.put(skewedKeyJoinCols, mapEntry.getValue());
+      }
+
+      return skewDataReturn;
+    }
+
+    /**
+     * Get the table alias from the candidate table scans.
+     */
+    private Table getTable(
+      ParseContext parseContext,
+      Operator<? extends OperatorDesc> op,
+      List<TableScanOperator> tableScanOpsForJoin) {
+      while (true) {
+        if (op instanceof TableScanOperator) {
+          TableScanOperator tsOp = (TableScanOperator)op;
+          if (tableScanOpsForJoin.contains(tsOp)) {
+            return parseContext.getTopToTable().get(tsOp);
+          }
+        }
+        if ((op.getParentOperators() == null) || (op.getParentOperators().size() > 1)) {
+          return null;
+        }
+        op = op.getParentOperators().get(0);
+      }
+    }
+
+    /*
+     * If the skewedValues contains ((1,2,3),(4,5,6)), and the user is looking for
+     * positions (0,2), the result would be ((1,3),(4,6))
+     * Get the skewed key values that are part of the join key.
+     * @param skewedValuesList List of all the skewed values
+     * @param positionSkewedKeys the requested positions
+     * @return sub-list of skewed values with the positions present
+     */
+    private List<List<String>> getSkewedJoinValues(
+      List<List<String>> skewedValueList, List<Integer> positionSkewedKeys) {
+      List<List<String>> skewedJoinValues = new ArrayList<List<String>>();
+      for (List<String> skewedValuesAllColumns : skewedValueList) {
+        List<String> skewedValuesSpecifiedColumns = new ArrayList<String>();
+        for (int pos : positionSkewedKeys) {
+          skewedValuesSpecifiedColumns.add(skewedValuesAllColumns.get(pos));
+        }
+        skewedJoinValues.add(skewedValuesSpecifiedColumns);
+      }
+      return skewedJoinValues;
+    }
+
+    /**
+     * Inserts a filter comparing the join keys with the skewed keys. If the table
+     * is skewed with values (k1, v1) and (k2, v2) on columns (key, value), then
+     * filter ((key=k1 AND value=v1) OR (key=k2 AND value=v2)) is inserted. If @skewed
+     * is false, a NOT is inserted before it.
+     * @param tableScanOpsForJoin table scans for which the filter will be inserted
+     * @param skewedValuesList the map of <expressions, list of skewed values>
+     * @param skewed True if we want skewedCol = skewedValue, false if we want
+     * not (skewedCol = skewedValue)
+     */
+    private void insertSkewFilter(
+      List<TableScanOperator> tableScanOpsForJoin,
+      Map<List<ExprNodeDesc>, List<List<String>>> skewedValuesList,
+      boolean skewed) {
+
+      ExprNodeDesc filterExpr = constructFilterExpr(skewedValuesList, skewed);
+      for (TableScanOperator tableScanOp : tableScanOpsForJoin) {
+        insertFilterOnTop(tableScanOp, filterExpr);
+      }
+    }
+
+    /**
+     * Inserts a filter below the table scan operator. Construct the filter
+     * from the filter expression provided.
+     * @param tableScanOp the table scan operators
+     * @param filterExpr the filter expression
+     */
+    private void insertFilterOnTop(
+      TableScanOperator tableScanOp,
+      ExprNodeDesc filterExpr) {
+
+      // Get the top operator and it's child, all operators have a single parent
+      Operator<? extends OperatorDesc> currChild = tableScanOp.getChildOperators().get(0);
+
+      // Create the filter Operator and update the parents and children appropriately
+      tableScanOp.setChildOperators(null);
+      currChild.setParentOperators(null);
+
+      Operator<FilterDesc> filter = OperatorFactory.getAndMakeChild(
+        new FilterDesc(filterExpr, false), tableScanOp);
+      filter.setSchema(new RowSchema(tableScanOp.getSchema().getSignature()));
+      OperatorFactory.makeChild(filter, currChild);
+
+      RowResolver filterRR = parseContext.getOpParseCtx().get(tableScanOp).getRowResolver();
+      GenMapRedUtils.putOpInsertMap(filter, filterRR, parseContext);
+    }
+
+    /**
+     * Construct the filter expression from the skewed keys and skewed values.
+     * If the skewed join keys are (k1), and (k1,k3) with the skewed values
+     * (1,2) and ((2,3),(4,5)) respectively, the filter expression would be:
+     * (k1=1) or (k1=2) or ((k1=2) and (k3=3)) or ((k1=4) and (k3=5)).
+     */
+    private ExprNodeDesc constructFilterExpr(
+      Map<List<ExprNodeDesc>, List<List<String>>> skewedValuesMap,
+      boolean skewed) {
+
+      ExprNodeDesc finalExprNodeDesc = null;
+      try {
+        for (Map.Entry<List<ExprNodeDesc>, List<List<String>>> mapEntry :
+          skewedValuesMap.entrySet()) {
+          List<ExprNodeDesc> keyCols = mapEntry.getKey();
+          List<List<String>> skewedValuesList = mapEntry.getValue();
+
+          for (List<String> skewedValues : skewedValuesList) {
+            int keyPos = 0;
+            ExprNodeDesc currExprNodeDesc = null;
+
+            // Make the following condition: all the values match for all the columns
+            for (String skewedValue : skewedValues) {
+              List<ExprNodeDesc> children = new ArrayList<ExprNodeDesc>();
+
+              // We have ensured that the keys are columns
+              ExprNodeColumnDesc keyCol = (ExprNodeColumnDesc) keyCols.get(keyPos).clone();
+              keyPos++;
+              children.add(keyCol);
+
+              // Convert the constants available as strings to the corresponding objects
+              children.add(createConstDesc(skewedValue, keyCol));
+
+              ExprNodeGenericFuncDesc expr = null;
+              // Create the equality condition
+              expr = ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPEqual(), children);
+              if (currExprNodeDesc == null) {
+                currExprNodeDesc = expr;
+              } else {
+                // If there are previous nodes, then AND the current node with the previous one
+                List<ExprNodeDesc> childrenAND = new ArrayList<ExprNodeDesc>();
+                childrenAND.add(currExprNodeDesc);
+                childrenAND.add(expr);
+                currExprNodeDesc =
+                  ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPAnd(), childrenAND);
+              }
+            }
+
+            // If there are more than one skewed values,
+            // then OR the current node with the previous one
+            if (finalExprNodeDesc == null) {
+              finalExprNodeDesc = currExprNodeDesc;
+            } else {
+              List<ExprNodeDesc> childrenOR = new ArrayList<ExprNodeDesc>();
+              childrenOR.add(finalExprNodeDesc);
+              childrenOR.add(currExprNodeDesc);
+
+              finalExprNodeDesc =
+                ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPOr(), childrenOR);
+            }
+          }
+        }
+
+        // Add a NOT operator in the beginning (this is for the cloned operator because we
+        // want the values which are not skewed
+        if (skewed == false) {
+          List<ExprNodeDesc> childrenNOT = new ArrayList<ExprNodeDesc>();
+          childrenNOT.add(finalExprNodeDesc);
+          finalExprNodeDesc =
+            ExprNodeGenericFuncDesc.newInstance(new GenericUDFOPNot(), childrenNOT);
+        }
+      } catch (UDFArgumentException e) {
+        // Ignore the exception because we are not comparing Long vs. String here.
+        // There should never be an exception
+        assert false;
+      }
+      return finalExprNodeDesc;
+    }
+
+    /**
+     * Converts the skewedValue available as a string in the metadata to the appropriate object
+     * by using the type of the column from the join key.
+     * @param skewedValue
+     * @param keyCol
+     * @return an expression node descriptor of the appropriate constant
+     */
+    private ExprNodeConstantDesc createConstDesc(
+      String skewedValue, ExprNodeColumnDesc keyCol) {
+      ObjectInspector inputOI = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+        TypeInfoFactory.stringTypeInfo);
+      ObjectInspector outputOI = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+        keyCol.getTypeInfo());
+      Converter converter = ObjectInspectorConverters.getConverter(inputOI, outputOI);
+      Object skewedValueObject = converter.convert(skewedValue);
+      return new ExprNodeConstantDesc(keyCol.getTypeInfo(), skewedValueObject);
+    }
+
+    private Map<String, Operator<? extends OperatorDesc>> getTopOps(
+      Operator<? extends OperatorDesc> op) {
+      Map<String, Operator<? extends OperatorDesc>> topOps =
+        new HashMap<String, Operator<? extends OperatorDesc>>();
+      if (op.getParentOperators() == null || op.getParentOperators().size() == 0) {
+        topOps.put(((TableScanOperator)op).getConf().getAlias(), op);
+      } else {
+        for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+          if (parent != null) {
+            topOps.putAll(getTopOps(parent));
+          }
+        }
+      }
+      return topOps;
+    }
+
+    private void insertRowResolvers(
+      Operator<? extends OperatorDesc> op,
+      Operator<? extends OperatorDesc> opClone,
+      SkewJoinOptProcCtx ctx) {
+
+      if (op instanceof TableScanOperator) {
+        ctx.getCloneTSOpMap().put((TableScanOperator)opClone, (TableScanOperator)op);
+      }
+
+      GenMapRedUtils.putOpInsertMap(
+        opClone, parseContext.getOpParseCtx().get(op).getRowResolver(), parseContext);
+
+      List<Operator<? extends OperatorDesc>> parents = op.getParentOperators();
+      List<Operator<? extends OperatorDesc>> parentClones = opClone.getParentOperators();
+      if ((parents != null) && (!parents.isEmpty()) &&
+        (parentClones != null) && (!parentClones.isEmpty())) {
+        for (int pos = 0; pos < parents.size(); pos++) {
+          insertRowResolvers(parents.get(pos), parentClones.get(pos), ctx);
+        }
+      }
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.hadoop.hive.ql.optimizer.Transform#transform
+   * (org.apache.hadoop.hive.ql.parse.ParseContext)
+   */
+  @Override
+  public ParseContext transform(ParseContext pctx) throws SemanticException {
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+
+    opRules.put(new RuleRegExp("R1", "TS%.*RS%JOIN%"), getSkewJoinProc());
+    SkewJoinOptProcCtx skewJoinOptProcCtx = new SkewJoinOptProcCtx(pctx);
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(
+      null, opRules, skewJoinOptProcCtx);
+    GraphWalker ogw = new DefaultGraphWalker(disp);
+
+    // Create a list of topop nodes
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+    return pctx;
+  }
+
+  private NodeProcessor getSkewJoinProc() {
+    return new SkewJoinProc();
+  }
+
+  /**
+   * SkewJoinOptProcCtx.
+   *
+   */
+  public static class SkewJoinOptProcCtx implements NodeProcessorCtx {
+
+    private ParseContext pGraphContext;
+
+    // set of joins already processed
+    private Set<JoinOperator> doneJoins;
+    private Map<TableScanOperator, TableScanOperator> cloneTSOpMap;
+
+    public SkewJoinOptProcCtx(ParseContext pctx) {
+      this.pGraphContext = pctx;
+      doneJoins = new HashSet<JoinOperator>();
+      cloneTSOpMap = new HashMap<TableScanOperator, TableScanOperator>();
+    }
+
+    public ParseContext getpGraphContext() {
+      return pGraphContext;
+    }
+
+    public void setPGraphContext(ParseContext graphContext) {
+      pGraphContext = graphContext;
+    }
+
+    public Set<JoinOperator> getDoneJoins() {
+      return doneJoins;
+    }
+
+    public void setDoneJoins(Set<JoinOperator> doneJoins) {
+      this.doneJoins = doneJoins;
+    }
+
+    public Map<TableScanOperator, TableScanOperator> getCloneTSOpMap() {
+      return cloneTSOpMap;
+    }
+
+    public void setCloneTSOpMap(Map<TableScanOperator, TableScanOperator> cloneTSOpMap) {
+      this.cloneTSOpMap = cloneTSOpMap;
+    }
+  }
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Tue Sep 18 05:53:35 2012
@@ -8356,7 +8356,7 @@ public class SemanticAnalyzer extends Ba
 
       /**
        * This code is commented out pending further testing/development
-       * for (Task<? extends SerializableCloneable> t: rootTasks)
+       * for (Task<? extends OperatorDesc> t: rootTasks)
        * t.localizeMRTmpFiles(ctx);
        */
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeColumnDesc.java Tue Sep 18 05:53:35 2012
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
@@ -156,4 +157,14 @@ public class ExprNodeColumnDesc extends 
   public void setSkewedCol(boolean isSkewedCol) {
     this.isSkewedCol = isSkewedCol;
   }
+
+  @Override
+  public int hashCode() {
+    int superHashCode = super.hashCode();
+    HashCodeBuilder builder = new HashCodeBuilder();
+    builder.appendSuper(superHashCode);
+    builder.append(column);
+    builder.append(tabAlias);
+    return builder.toHashCode();
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeConstantDesc.java Tue Sep 18 05:53:35 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.io.Serializable;
 
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
@@ -108,4 +109,13 @@ public class ExprNodeConstantDesc extend
 
     return true;
   }
+
+  @Override
+  public int hashCode() {
+    int superHashCode = super.hashCode();
+    HashCodeBuilder builder = new HashCodeBuilder();
+    builder.appendSuper(superHashCode);
+    builder.append(value);
+    return builder.toHashCode();
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeDesc.java Tue Sep 18 05:53:35 2012
@@ -53,6 +53,11 @@ public abstract class ExprNodeDesc imple
   // object equality - isSame means that the objects are semantically equal.
   public abstract boolean isSame(Object o);
 
+  @Override
+  public int hashCode() {
+    return typeInfo.hashCode();
+  }
+
   public TypeInfo getTypeInfo() {
     return typeInfo;
   }
@@ -116,5 +121,10 @@ public abstract class ExprNodeDesc imple
 
       return this.exprNodeDesc.isSame(((ExprNodeDescEqualityWrapper)other).getExprNodeDesc());
     }
+
+    @Override
+    public int hashCode() {
+      return exprNodeDesc == null ? 0 : exprNodeDesc.hashCode();
+    }
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeFieldDesc.java Tue Sep 18 05:53:35 2012
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 
@@ -122,4 +123,15 @@ public class ExprNodeFieldDesc extends E
 
     return true;
   }
+
+  @Override
+  public int hashCode() {
+    int superHashCode = super.hashCode();
+    HashCodeBuilder builder = new HashCodeBuilder();
+    builder.appendSuper(superHashCode);
+    builder.append(desc);
+    builder.append(fieldName);
+    builder.append(isList);
+    return builder.toHashCode();
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java?rev=1386996&r1=1386995&r2=1386996&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java Tue Sep 18 05:53:35 2012
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -268,6 +269,15 @@ public class ExprNodeGenericFuncDesc ext
     return true;
   }
 
+  @Override
+  public int hashCode() {
+    int superHashCode = super.hashCode();
+    HashCodeBuilder builder = new HashCodeBuilder();
+    builder.appendSuper(superHashCode);
+    builder.append(childExprs);
+    return builder.toHashCode();
+  }
+
   public boolean isSortedExpr() {
     return isSortedExpr;
   }
@@ -275,5 +285,4 @@ public class ExprNodeGenericFuncDesc ext
   public void setSortedExpr(boolean isSortedExpr) {
     this.isSortedExpr = isSortedExpr;
   }
-
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt1.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,38 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- a simple join query with skew on both the tables on the join key
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+-- test outer joins also
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+-- an aggregation at the end should not change anything
+
+EXPLAIN
+SELECT count(1) FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT count(1) FROM T1 a JOIN T2 b ON a.key = b.key;
+
+EXPLAIN
+SELECT count(1) FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT count(1) FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt10.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,17 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, value STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+drop table array_valued_T1;
+create table array_valued_T1 (key string, value array<string>) SKEWED BY (key) ON ((8));
+insert overwrite table array_valued_T1 select key, array(value) from T1;
+
+-- This test is to verify the skew join compile optimization when the join is followed by a lateral view
+explain 
+select * from (select a.key as key, b.value as array_val from T1 a join array_valued_T1 b on a.key=b.key) i lateral view explode (array_val) c as val;
+
+select * from (select a.key as key, b.value as array_val from T1 a join array_valued_T1 b on a.key=b.key) i lateral view explode (array_val) c as val;
+

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt11.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,29 @@
+set hive.internal.ddl.list.bucketing.enable=true;	
+set hive.optimize.skewjoin.compiletime = true;
+    
+CREATE TABLE T1(key STRING, val STRING)	
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;	
+       
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;	
+     
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;	
+       
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;	
+     
+-- This test is to verify the skew join compile optimization when the join is followed
+-- by a union. Both sides of a union consist of a join, which should have used
+-- skew join compile time optimization.
+EXPLAIN	 
+select * from	
+(      
+  select a.key, a.val as val1, b.val as val2 from T1 a join T2 b on a.key = b.key
+    union all 	
+  select a.key, a.val as val1, b.val as val2 from T1 a join T2 b on a.key = b.key
+) subq1; 
+  
+select * from	
+(      
+  select a.key, a.val as val1, b.val as val2 from T1 a join T2 b on a.key = b.key
+    union all 	
+  select a.key, a.val as val1, b.val as val2 from T1 a join T2 b on a.key = b.key
+) subq1;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt12.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12), (8, 18)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key, val) ON ((3, 13), (8, 18)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- Both the join tables are skewed by 2 keys, and one of the skewed values
+-- is common to both the tables. The join key matches the skewed key set.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt13.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,33 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+CREATE TABLE T3(key STRING, val STRING)
+SKEWED BY (val) ON ((12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+
+-- This test is for skewed join compile time optimization for more than 2 tables.
+-- The join key for table 3 is different from the join key used for joining
+-- tables 1 and 2. Table 3 is skewed, but since one of the join sources for table
+-- 3 consist of a sub-query which contains a join, the compile time skew join 
+-- optimization is not performed
+
+EXPLAIN
+select *
+from 
+T1 a join T2 b on a.key = b.key 
+join T3 c on a.val = c.val;
+
+select *
+from 
+T1 a join T2 b on a.key = b.key 
+join T3 c on a.val = c.val;
+

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt14.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,34 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+CREATE TABLE T3(key STRING, val STRING)
+SKEWED BY (val) ON ((12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+
+-- This test is for skewed join compile time optimization for more than 2 tables.
+-- The join key for table 3 is different from the join key used for joining
+-- tables 1 and 2. Tables 1 and 3 are skewed. Since one of the join sources for table
+-- 3 consist of a sub-query which contains a join, the compile time skew join 
+-- optimization is not enabled for table 3, but it is used for the first join between
+-- tables 1 and 2
+EXPLAIN
+select *
+from 
+T1 a join T2 b on a.key = b.key 
+join T3 c on a.val = c.val;
+
+select *
+from 
+T1 a join T2 b on a.key = b.key 
+join T3 c on a.val = c.val;
+

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt15.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,46 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE tmpT1(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE tmpT1;
+
+-- testing skew on other data types - int
+CREATE TABLE T1(key INT, val STRING) SKEWED BY (key) ON ((2));
+INSERT OVERWRITE TABLE T1 SELECT key, val FROM tmpT1;
+
+CREATE TABLE tmpT2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE tmpT2;
+
+CREATE TABLE T2(key INT, val STRING) SKEWED BY (key) ON ((3));
+
+INSERT OVERWRITE TABLE T2 SELECT key, val FROM tmpT2;
+
+-- The skewed key is a integer column.
+-- Otherwise this test is similar to skewjoinopt1.q
+-- Both the joined tables are skewed, and the joined column
+-- is an integer
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+-- test outer joins also
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+-- an aggregation at the end should not change anything
+
+EXPLAIN
+SELECT count(1) FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT count(1) FROM T1 a JOIN T2 b ON a.key = b.key;
+
+EXPLAIN
+SELECT count(1) FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT count(1) FROM T1 a RIGHT OUTER JOIN T2 b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt16.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- One of the tables is skewed by 2 columns, and the other table is
+-- skewed by one column. Ths join is performed on the both the columns
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt17.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,45 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- One of the tables is skewed by 2 columns, and the other table is
+-- skewed by one column. Ths join is performed on the first skewed column
+-- The skewed value for the jon key is common to both the tables.
+-- In this case, the skewed join value is not repeated in the filter.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+DROP TABLE T1;
+DROP TABLE T2;
+
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- One of the tables is skewed by 2 columns, and the other table is
+-- skewed by one column. Ths join is performed on the both the columns
+-- In this case, the skewed join value is repeated in the filter.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt18.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,26 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE tmpT1(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE tmpT1;
+
+-- testing skew on other data types - int
+CREATE TABLE T1(key INT, val STRING) SKEWED BY (key) ON ((2));
+INSERT OVERWRITE TABLE T1 SELECT key, val FROM tmpT1;
+
+-- Tke skewed column is same in both the tables, however it is
+-- INT in one of the tables, and STRING in the other table
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- Once HIVE-3445 is fixed, the compile time skew join optimization would be
+-- applicable here. Till the above jira is fixed, it would be performed as a
+-- regular join
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt19.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) INTO 4 BUCKETS
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- add a test where the skewed key is also the bucketized key
+-- it should not matter, and the compile time skewed join
+-- optimization is performed
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt2.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,41 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2), (7)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- a simple query with skew on both the tables on the join key
+-- multiple skew values are present for the skewed keys
+-- but the skewed values do not overlap.
+-- The join values are a superset of the skewed keys.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+-- test outer joins also
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a LEFT OUTER JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+SELECT a.*, b.* FROM T1 a LEFT OUTER JOIN T2 b ON a.key = b.key and a.val = b.val;
+
+-- a group by at the end should not change anything
+
+EXPLAIN
+SELECT a.key, count(1) FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val group by a.key;
+
+SELECT a.key, count(1) FROM T1 a JOIN T2 b ON a.key = b.key and a.val = b.val group by a.key;
+
+EXPLAIN
+SELECT a.key, count(1) FROM T1 a LEFT OUTER JOIN T2 b ON a.key = b.key and a.val = b.val group by a.key;
+
+SELECT a.key, count(1) FROM T1 a LEFT OUTER JOIN T2 b ON a.key = b.key and a.val = b.val group by a.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt20.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+CLUSTERED BY (key) SORTED BY (key) INTO 4 BUCKETS
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- add a test where the skewed key is also the bucketized/sorted key
+-- it should not matter, and the compile time skewed join
+-- optimization is performed
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt3.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,28 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- a simple query with skew on both the tables. One of the skewed
+-- value is common to both the tables. The skewed value should not be
+-- repeated in the filter.
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+-- test outer joins also
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a FULL OUTER JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a FULL OUTER JOIN T2 b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt4.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,25 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- only of the tables of the join (the left table of the join) is skewed
+-- the skewed filter would still be applied to both the tables
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+-- the order of the join should not matter, just confirming
+EXPLAIN
+SELECT a.*, b.* FROM T2 a JOIN T1 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T2 a JOIN T1 b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt5.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,20 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- One of the tables is skewed by 2 columns, and the other table is
+-- skewed by one column. Ths join is performed on the first skewed column
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt6.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,21 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key, val) ON ((2, 12), (8, 18)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key, val) ON ((3, 13), (8, 18)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- Both the join tables are skewed by 2 keys, and one of the skewed values
+-- is common to both the tables. The join key is a subset of the skewed key set:
+-- it only contains the first skewed key for both the tables
+
+EXPLAIN
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;
+
+SELECT a.*, b.* FROM T1 a JOIN T2 b ON a.key = b.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt7.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,24 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+CREATE TABLE T3(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+
+-- This test is for validating skewed join compile time optimization for more than
+-- 2 tables. The join key is the same, and so a 3-way join would be performed.
+-- 2 of the 3 tables are skewed on the join key
+EXPLAIN
+SELECT a.*, b.*, c.* FROM T1 a JOIN T2 b ON a.key = b.key JOIN T3 c on a.key = c.key;
+
+SELECT a.*, b.*, c.* FROM T1 a JOIN T2 b ON a.key = b.key JOIN T3 c on a.key = c.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt8.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,23 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING)
+SKEWED BY (key) ON ((3), (8)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+CREATE TABLE T3(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T3.txt' INTO TABLE T3;
+
+-- This test is for validating skewed join compile time optimization for more than
+-- 2 tables. The join key is the same, and so a 3-way join would be performed.
+-- 1 of the 3 tables are skewed on the join key
+EXPLAIN
+SELECT a.*, b.*, c.* FROM T1 a JOIN T2 b ON a.key = b.key JOIN T3 c on a.key = c.key;
+
+SELECT a.*, b.*, c.* FROM T1 a JOIN T2 b ON a.key = b.key JOIN T3 c on a.key = c.key;

Added: hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q?rev=1386996&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/skewjoinopt9.q Tue Sep 18 05:53:35 2012
@@ -0,0 +1,45 @@
+set hive.internal.ddl.list.bucketing.enable=true;
+set hive.optimize.skewjoin.compiletime = true;
+
+CREATE TABLE T1(key STRING, val STRING)
+SKEWED BY (key) ON ((2)) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T1.txt' INTO TABLE T1;
+
+CREATE TABLE T2(key STRING, val STRING) STORED AS TEXTFILE;
+
+LOAD DATA LOCAL INPATH '../data/files/T2.txt' INTO TABLE T2;
+
+-- no skew join compile time optimization would be performed if one of the
+-- join sources is a sub-query consisting of a union all
+EXPLAIN
+select * from
+(
+select key, val from T1
+  union all 
+select key, val from T1
+) subq1
+join T2 b on subq1.key = b.key;
+
+select * from
+(
+select key, val from T1
+  union all 
+select key, val from T1
+) subq1
+join T2 b on subq1.key = b.key;
+
+-- no skew join compile time optimization would be performed if one of the
+-- join sources is a sub-query consisting of a group by
+EXPLAIN
+select * from
+(
+select key, count(1) as cnt from T1 group by key
+) subq1
+join T2 b on subq1.key = b.key;
+
+select * from
+(
+select key, count(1) as cnt from T1 group by key
+) subq1
+join T2 b on subq1.key = b.key;