You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/17 04:19:58 UTC

svn commit: r1646144 - in /hive/branches/spark/ql/src: java/org/apache/hadoop/hive/ql/optimizer/ java/org/apache/hadoop/hive/ql/optimizer/physical/ java/org/apache/hadoop/hive/ql/optimizer/spark/ java/org/apache/hadoop/hive/ql/plan/ test/results/client...

Author: xuefu
Date: Wed Dec 17 03:19:57 2014
New Revision: 1646144

URL: http://svn.apache.org/r1646144
Log:
HIVE-9097: Support runtime skew join for more queries [Spark Branch] (Rui via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
    hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Wed Dec 17 03:19:57 2014
@@ -983,7 +983,7 @@ public final class GenMapRedUtils {
    * @param parseCtx
    * @return The TableScanOperator inserted before child.
    */
-  protected static TableScanOperator createTemporaryFile(
+  public static TableScanOperator createTemporaryFile(
       Operator<? extends OperatorDesc> parent, Operator<? extends OperatorDesc> child,
       Path taskTmpDir, TableDesc tt_desc, ParseContext parseCtx) {
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java Wed Dec 17 03:19:57 2014
@@ -82,22 +82,13 @@ public class GenSparkSkewJoinProcessor {
   public static void processSkewJoin(JoinOperator joinOp, Task<? extends Serializable> currTask,
       ReduceWork reduceWork, ParseContext parseCtx) throws SemanticException {
 
-    // We are trying to adding map joins to handle skew keys, and map join right
-    // now does not work with outer joins
-    if (!GenMRSkewJoinProcessor.skewJoinEnabled(parseCtx.getConf(), joinOp) ||
-        !(currTask instanceof SparkTask)) {
-      return;
-    }
     SparkWork currentWork = ((SparkTask) currTask).getWork();
-    if (!supportRuntimeSkewJoin(currentWork, reduceWork)) {
+    if (currentWork.getChildren(reduceWork).size() > 0) {
+      LOG.warn("Skip runtime skew join as the ReduceWork has child work and hasn't been split.");
       return;
     }
 
     List<Task<? extends Serializable>> children = currTask.getChildTasks();
-    if (children != null && children.size() > 1) {
-      LOG.warn("Skip runtime skew join as current task has multiple children.");
-      return;
-    }
 
     Task<? extends Serializable> child =
         children != null && children.size() == 1 ? children.get(0) : null;
@@ -424,17 +415,11 @@ public class GenSparkSkewJoinProcessor {
     mapJoinDesc.setHashTableMemoryUsage(hashtableMemoryUsage);
   }
 
-  /**
-   * Currently, we only support the simplest cases where join is the last work
-   * of a spark work, i.e. the current ReduceWork is a leave work
-   * If the reduce work has follow-up work, e.g. an aggregation following the join,
-   * it's difficult to union the results of the original join and conditional map join
-   * and feed that to the follow up works. This is not an issue for MR, where ReduceWork
-   * is always a terminal work.
-   *
-   * TODO: can we relax this
-   */
-  private static boolean supportRuntimeSkewJoin(SparkWork sparkWork, BaseWork work) {
-    return sparkWork.getChildren(work).isEmpty();
+  // check this before calling processSkewJoin
+  public static boolean supportRuntimeSkewJoin(JoinOperator joinOp,
+      Task<? extends Serializable> currTask, HiveConf hiveConf) {
+    List<Task<? extends Serializable>> children = currTask.getChildTasks();
+    return GenMRSkewJoinProcessor.skewJoinEnabled(hiveConf, joinOp) &&
+        (children == null || children.size() <= 1);
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Wed Dec 17 03:19:57 2014
@@ -78,7 +78,7 @@ public class SparkMapJoinResolver implem
     return matchingOps != null && !matchingOps.isEmpty();
   }
 
-  private Set<Operator<? extends OperatorDesc>> getOp(BaseWork work, Class<?> clazz) {
+  public static Set<Operator<? extends OperatorDesc>> getOp(BaseWork work, Class<?> clazz) {
     Set<Operator<? extends OperatorDesc>> ops = new HashSet<Operator<? extends OperatorDesc>>();
     if (work instanceof MapWork) {
       Collection<Operator<?>> opSet = ((MapWork) work).getAliasToWork().values();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinProcFactory.java Wed Dec 17 03:19:57 2014
@@ -18,18 +18,41 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.optimizer.physical.GenSparkSkewJoinProcessor;
 import org.apache.hadoop.hive.ql.optimizer.physical.SkewJoinProcFactory;
+import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkUtils;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 
 import java.io.Serializable;
+import java.util.List;
+import java.util.Set;
 import java.util.Stack;
 
 /**
@@ -55,17 +78,136 @@ public class SparkSkewJoinProcFactory {
         Object... nodeOutputs) throws SemanticException {
       SparkSkewJoinResolver.SparkSkewJoinProcCtx context =
           (SparkSkewJoinResolver.SparkSkewJoinProcCtx) procCtx;
+      Task<? extends Serializable> currentTsk = context.getCurrentTask();
       JoinOperator op = (JoinOperator) nd;
-      if (op.getConf().isFixedAsSorted()) {
-        return null;
-      }
+      ReduceWork reduceWork = context.getReducerToReduceWork().get(op);
       ParseContext parseContext = context.getParseCtx();
-      Task<? extends Serializable> currentTsk = context.getCurrentTask();
-      if (currentTsk instanceof SparkTask) {
-        GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk,
-            context.getReducerToReduceWork().get(op), parseContext);
+      if (!op.getConf().isFixedAsSorted() && currentTsk instanceof SparkTask &&
+          reduceWork != null && ((SparkTask) currentTsk).getWork().contains(reduceWork) &&
+          GenSparkSkewJoinProcessor.supportRuntimeSkewJoin(
+              op, currentTsk, parseContext.getConf())) {
+        // first we try to split the task
+        splitTask((SparkTask) currentTsk, reduceWork, parseContext);
+        GenSparkSkewJoinProcessor.processSkewJoin(op, currentTsk, reduceWork, parseContext);
       }
       return null;
     }
   }
+
+  /**
+   * If the join is not in a leaf ReduceWork, the spark task has to be split into 2 tasks.
+   */
+  private static void splitTask(SparkTask currentTask, ReduceWork reduceWork,
+      ParseContext parseContext) throws SemanticException {
+    SparkWork currentWork = currentTask.getWork();
+    Set<Operator<? extends OperatorDesc>> reduceSinkSet =
+        SparkMapJoinResolver.getOp(reduceWork, ReduceSinkOperator.class);
+    if (currentWork.getChildren(reduceWork).size() == 1 && canSplit(currentWork) &&
+        reduceSinkSet.size() == 1) {
+      ReduceSinkOperator reduceSink = (ReduceSinkOperator) reduceSinkSet.iterator().next();
+      BaseWork childWork = currentWork.getChildren(reduceWork).get(0);
+      SparkEdgeProperty originEdge = currentWork.getEdgeProperty(reduceWork, childWork);
+      // disconnect the reduce work from its child. this should produce two isolated sub graphs
+      currentWork.disconnect(reduceWork, childWork);
+      // move works following the current reduce work into a new spark work
+      SparkWork newWork =
+          new SparkWork(parseContext.getConf().getVar(HiveConf.ConfVars.HIVEQUERYID));
+      newWork.add(childWork);
+      copyWorkGraph(currentWork, newWork, childWork, true);
+      copyWorkGraph(currentWork, newWork, childWork, false);
+      // remove them from current spark work
+      for (BaseWork baseWork : newWork.getAllWorkUnsorted()) {
+        currentWork.remove(baseWork);
+        // TODO: take care of cloneToWork?
+        currentWork.getCloneToWork().remove(baseWork);
+      }
+      // create TS to read intermediate data
+      Context baseCtx = parseContext.getContext();
+      Path taskTmpDir = baseCtx.getMRTmpPath();
+      Operator<? extends OperatorDesc> rsParent = reduceSink.getParentOperators().get(0);
+      TableDesc tableDesc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+          .getFieldSchemasFromRowSchema(rsParent.getSchema(), "temporarycol"));
+      // this will insert FS and TS between the RS and its parent
+      TableScanOperator tableScanOp = GenMapRedUtils.createTemporaryFile(
+          rsParent, reduceSink, taskTmpDir, tableDesc, parseContext);
+      // create new MapWork
+      MapWork mapWork = PlanUtils.getMapRedWork().getMapWork();
+      mapWork.setName("Map " + GenSparkUtils.getUtils().getNextSeqNumber());
+      newWork.add(mapWork);
+      newWork.connect(mapWork, childWork, originEdge);
+      // setup the new map work
+      String streamDesc = taskTmpDir.toUri().toString();
+      if (GenMapRedUtils.needsTagging((ReduceWork) childWork)) {
+        Operator<? extends OperatorDesc> childReducer = ((ReduceWork) childWork).getReducer();
+        QBJoinTree joinTree = null;
+        if (childReducer instanceof JoinOperator) {
+          joinTree = parseContext.getJoinContext().get(childReducer);
+        } else if (childReducer instanceof MapJoinOperator) {
+          joinTree = parseContext.getMapJoinContext().get(childReducer);
+        } else if (childReducer instanceof SMBMapJoinOperator) {
+          joinTree = parseContext.getSmbMapJoinContext().get(childReducer);
+        }
+        if (joinTree != null && joinTree.getId() != null) {
+          streamDesc = joinTree.getId() + ":$INTNAME";
+        } else {
+          streamDesc = "$INTNAME";
+        }
+        // TODO: remove this?
+        String origStreamDesc = streamDesc;
+        int pos = 0;
+        while (mapWork.getAliasToWork().get(streamDesc) != null) {
+          streamDesc = origStreamDesc.concat(String.valueOf(++pos));
+        }
+      }
+      GenMapRedUtils.setTaskPlan(taskTmpDir.toUri().toString(), streamDesc,
+          tableScanOp, mapWork, false, tableDesc);
+      // insert the new task between current task and its child
+      Task<? extends Serializable> newTask = TaskFactory.get(newWork, parseContext.getConf());
+      List<Task<? extends Serializable>> childTasks = currentTask.getChildTasks();
+      // must have at most one child
+      if (childTasks != null && childTasks.size() > 0) {
+        Task<? extends Serializable> childTask = childTasks.get(0);
+        currentTask.removeDependentTask(childTask);
+        newTask.addDependentTask(childTask);
+      }
+      currentTask.addDependentTask(newTask);
+      newTask.setFetchSource(currentTask.isFetchSource());
+    }
+  }
+
+  /**
+   * Whether we can split at reduceWork. For simplicity, let's require each work can
+   * have at most one child work. This may be relaxed by checking connectivity of the
+   * work graph after disconnect the current reduce work from its child
+   */
+  private static boolean canSplit(SparkWork sparkWork) {
+    for (BaseWork baseWork : sparkWork.getAllWorkUnsorted()) {
+      if (sparkWork.getChildren(baseWork).size() > 1) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Copy a sub-graph from originWork to newWork
+   */
+  private static void copyWorkGraph(SparkWork originWork, SparkWork newWork,
+      BaseWork baseWork, boolean upWards) {
+    if (upWards) {
+      for (BaseWork parent : originWork.getParents(baseWork)) {
+        newWork.add(parent);
+        SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(parent, baseWork);
+        newWork.connect(parent, baseWork, edgeProperty);
+        copyWorkGraph(originWork, newWork, parent, true);
+      }
+    } else {
+      for (BaseWork child : originWork.getChildren(baseWork)) {
+        newWork.add(child);
+        SparkEdgeProperty edgeProperty = originWork.getEdgeProperty(baseWork, child);
+        newWork.connect(baseWork, child, edgeProperty);
+        copyWorkGraph(originWork, newWork, child, false);
+      }
+    }
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSkewJoinResolver.java Wed Dec 17 03:19:57 2014
@@ -41,8 +41,10 @@ import org.apache.hadoop.hive.ql.plan.Sp
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Stack;
 
@@ -75,9 +77,6 @@ public class SparkSkewJoinResolver imple
       Task<? extends Serializable> task = (Task<? extends Serializable>) nd;
       if (task instanceof SparkTask) {
         SparkWork sparkWork = ((SparkTask) task).getWork();
-        if (sparkWork.getAllReduceWork().isEmpty()) {
-          return null;
-        }
         SparkSkewJoinProcCtx skewJoinProcCtx =
             new SparkSkewJoinProcCtx(task, physicalContext.getParseContext());
         Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
@@ -87,7 +86,10 @@ public class SparkSkewJoinResolver imple
             SparkSkewJoinProcFactory.getDefaultProc(), opRules, skewJoinProcCtx);
         GraphWalker ogw = new DefaultGraphWalker(disp);
         ArrayList<Node> topNodes = new ArrayList<Node>();
-        for (ReduceWork reduceWork : sparkWork.getAllReduceWork()) {
+        // since we may need to split the task, let's walk the graph bottom-up
+        List<ReduceWork> reduceWorkList = sparkWork.getAllReduceWork();
+        Collections.reverse(reduceWorkList);
+        for (ReduceWork reduceWork : reduceWorkList) {
           topNodes.add(reduceWork.getReducer());
           skewJoinProcCtx.getReducerToReduceWork().put(reduceWork.getReducer(), reduceWork);
         }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/SparkWork.java Wed Dec 17 03:19:57 2014
@@ -175,6 +175,7 @@ public class SparkWork extends AbstractO
     if (getChildren(a).isEmpty()) {
       leaves.add(a);
     }
+    edgeProperties.remove(new ImmutablePair<BaseWork, BaseWork>(a, b));
   }
 
   /**
@@ -397,7 +398,7 @@ public class SparkWork extends AbstractO
     return result;
   }
 
-  // get all reduce works in this spark work
+  // get all reduce works in this spark work in sorted order
   public List<ReduceWork> getAllReduceWork() {
     List<ReduceWork> result = new ArrayList<ReduceWork>();
     for (BaseWork work : getAllWork()) {

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out Wed Dec 17 03:19:57 2014
@@ -607,14 +607,17 @@ SELECT sum(hash(Y.key)), sum(hash(Y.valu
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-0 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-1 , consists of Stage-5, Stage-2
+  Stage-5
+  Stage-3 depends on stages: Stage-5
+  Stage-2 depends on stages: Stage-3
+  Stage-0 depends on stages: Stage-2
 
 STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 4 (PARTITION-LEVEL SORT, 3)
-        Reducer 3 <- Reducer 2 (GROUP, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -660,6 +663,7 @@ STAGE PLANS:
                 condition expressions:
                   0 
                   1 {KEY.reducesinkkey0} {VALUE._col0}
+                handleSkewJoin: true
                 outputColumnNames: _col2, _col3
                 Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
                 Group By Operator
@@ -667,6 +671,72 @@ STAGE PLANS:
                   mode: hash
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-4
+    Conditional Operator
+
+  Stage: Stage-5
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 7 
+            Map Operator Tree:
+                TableScan
+                  Spark HashTable Sink Operator
+                    condition expressions:
+                      0 
+                      1 {1_VALUE_0} {1_VALUE_1}
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    condition expressions:
+                      0 
+                      1 {1_VALUE_0} {1_VALUE_1}
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+                    outputColumnNames: _col2, _col3
+                    Group By Operator
+                      aggregations: sum(hash(_col2)), sum(hash(_col3))
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      File Output Operator
+                        compressed: false
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-2
+    Spark
+      Edges:
+        Reducer 3 <- Map 5 (GROUP, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 5 
+            Map Operator Tree:
+                TableScan
                   Reduce Output Operator
                     sort order: 
                     Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
@@ -731,14 +801,17 @@ SELECT sum(hash(Y.key)), sum(hash(Y.valu
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-0 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-1 , consists of Stage-5, Stage-2
+  Stage-5
+  Stage-3 depends on stages: Stage-5
+  Stage-2 depends on stages: Stage-3
+  Stage-0 depends on stages: Stage-2
 
 STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 4 (PARTITION-LEVEL SORT, 3)
-        Reducer 3 <- Reducer 2 (GROUP, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -784,6 +857,7 @@ STAGE PLANS:
                 condition expressions:
                   0 
                   1 {KEY.reducesinkkey0} {VALUE._col0}
+                handleSkewJoin: true
                 outputColumnNames: _col2, _col3
                 Statistics: Num rows: 137 Data size: 1460 Basic stats: COMPLETE Column stats: NONE
                 Group By Operator
@@ -791,6 +865,72 @@ STAGE PLANS:
                   mode: hash
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-4
+    Conditional Operator
+
+  Stage: Stage-5
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 7 
+            Map Operator Tree:
+                TableScan
+                  Spark HashTable Sink Operator
+                    condition expressions:
+                      0 
+                      1 {1_VALUE_0} {1_VALUE_1}
+                    keys:
+                      0 reducesinkkey0 (type: string), reducesinkkey1 (type: double)
+                      1 reducesinkkey0 (type: string), reducesinkkey1 (type: double)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    condition expressions:
+                      0 
+                      1 {1_VALUE_0} {1_VALUE_1}
+                    keys:
+                      0 reducesinkkey0 (type: string), reducesinkkey1 (type: double)
+                      1 reducesinkkey0 (type: string), reducesinkkey1 (type: double)
+                    outputColumnNames: _col2, _col3
+                    Group By Operator
+                      aggregations: sum(hash(_col2)), sum(hash(_col3))
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      File Output Operator
+                        compressed: false
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-2
+    Spark
+      Edges:
+        Reducer 3 <- Map 5 (GROUP, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 5 
+            Map Operator Tree:
+                TableScan
                   Reduce Output Operator
                     sort order: 
                     Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
@@ -863,14 +1003,19 @@ ON src1.c1 = src3.c5 AND src3.c5 < 80
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-0 depends on stages: Stage-1
+  Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-7, Stage-2
+  Stage-6
+  Stage-3 depends on stages: Stage-6
+  Stage-2 depends on stages: Stage-3, Stage-4
+  Stage-7
+  Stage-4 depends on stages: Stage-7
+  Stage-0 depends on stages: Stage-2
 
 STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 3), Map 4 (PARTITION-LEVEL SORT, 3), Map 5 (PARTITION-LEVEL SORT, 3)
-        Reducer 3 <- Reducer 2 (GROUP, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -935,6 +1080,7 @@ STAGE PLANS:
                   0 {KEY.reducesinkkey0}
                   1 {VALUE._col0}
                   2 
+                handleSkewJoin: true
                 outputColumnNames: _col0, _col3
                 Statistics: Num rows: 121 Data size: 1284 Basic stats: COMPLETE Column stats: NONE
                 Group By Operator
@@ -942,6 +1088,91 @@ STAGE PLANS:
                   mode: hash
                   outputColumnNames: _col0, _col1
                   Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-5
+    Conditional Operator
+
+  Stage: Stage-6
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 8 
+            Map Operator Tree:
+                TableScan
+                  Spark HashTable Sink Operator
+                    condition expressions:
+                      0 {0_VALUE_0}
+                      1 {1_VALUE_0}
+                      2 
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+                      2 reducesinkkey0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+        Map 9 
+            Map Operator Tree:
+                TableScan
+                  Spark HashTable Sink Operator
+                    condition expressions:
+                      0 {0_VALUE_0}
+                      1 {1_VALUE_0}
+                      2 
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+                      2 reducesinkkey0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 7 
+            Map Operator Tree:
+                TableScan
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                         Inner Join 0 to 2
+                    condition expressions:
+                      0 {0_VALUE_0}
+                      1 {1_VALUE_0}
+                      2 
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+                      2 reducesinkkey0 (type: string)
+                    outputColumnNames: _col0, _col3
+                    Group By Operator
+                      aggregations: sum(hash(_col0)), sum(hash(_col3))
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      File Output Operator
+                        compressed: false
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-2
+    Spark
+      Edges:
+        Reducer 3 <- Map 6 (GROUP, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 6 
+            Map Operator Tree:
+                TableScan
                   Reduce Output Operator
                     sort order: 
                     Statistics: Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: NONE
@@ -965,6 +1196,72 @@ STAGE PLANS:
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
+  Stage: Stage-7
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 10 
+            Map Operator Tree:
+                TableScan
+                  Spark HashTable Sink Operator
+                    condition expressions:
+                      0 {0_VALUE_0}
+                      1 {1_VALUE_0}
+                      2 
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+                      2 reducesinkkey0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+        Map 12 
+            Map Operator Tree:
+                TableScan
+                  Spark HashTable Sink Operator
+                    condition expressions:
+                      0 {0_VALUE_0}
+                      1 {1_VALUE_0}
+                      2 
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+                      2 reducesinkkey0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-4
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 11 
+            Map Operator Tree:
+                TableScan
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                         Inner Join 0 to 2
+                    condition expressions:
+                      0 {0_VALUE_0}
+                      1 {1_VALUE_0}
+                      2 
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+                      2 reducesinkkey0 (type: string)
+                    outputColumnNames: _col0, _col3
+                    Group By Operator
+                      aggregations: sum(hash(_col0)), sum(hash(_col3))
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      File Output Operator
+                        compressed: false
+                        table:
+                            input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+            Local Work:
+              Map Reduce Local Work
+
   Stage: Stage-0
     Fetch Operator
       limit: -1

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out?rev=1646144&r1=1646143&r2=1646144&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin_noskew.q.out Wed Dec 17 03:19:57 2014
@@ -6,16 +6,19 @@ create table noskew as select a.* from s
 POSTHOOK: type: CREATETABLE_AS_SELECT
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-0 depends on stages: Stage-1
-  Stage-3 depends on stages: Stage-0
-  Stage-2 depends on stages: Stage-3
+  Stage-5 depends on stages: Stage-1 , consists of Stage-6, Stage-3
+  Stage-6
+  Stage-4 depends on stages: Stage-6
+  Stage-3 depends on stages: Stage-4
+  Stage-0 depends on stages: Stage-3
+  Stage-7 depends on stages: Stage-0
+  Stage-2 depends on stages: Stage-7
 
 STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 1), Map 4 (PARTITION-LEVEL SORT, 1)
-        Reducer 3 <- Reducer 2 (SORT, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -53,13 +56,76 @@ STAGE PLANS:
                 condition expressions:
                   0 {KEY.reducesinkkey0} {VALUE._col0}
                   1 
+                handleSkewJoin: true
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: string)
-                  sort order: +
-                  Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
-                  value expressions: _col1 (type: string)
+                File Output Operator
+                  compressed: false
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-5
+    Conditional Operator
+
+  Stage: Stage-6
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 7 
+            Map Operator Tree:
+                TableScan
+                  Spark HashTable Sink Operator
+                    condition expressions:
+                      0 {0_VALUE_0} {0_VALUE_1}
+                      1 
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-4
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 6 
+            Map Operator Tree:
+                TableScan
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    condition expressions:
+                      0 {0_VALUE_0} {0_VALUE_1}
+                      1 
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+                    outputColumnNames: _col0, _col1
+                    File Output Operator
+                      compressed: false
+                      table:
+                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-3
+    Spark
+      Edges:
+        Reducer 3 <- Map 5 (SORT, 1)
+#### A masked pattern was here ####
+      Vertices:
+        Map 5 
+            Map Operator Tree:
+                TableScan
+                  Reduce Output Operator
+                    key expressions: _col0 (type: string)
+                    sort order: +
+                    Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
+                    value expressions: _col1 (type: string)
         Reducer 3 
             Reduce Operator Tree:
               Select Operator
@@ -84,7 +150,7 @@ STAGE PLANS:
           hdfs directory: true
 #### A masked pattern was here ####
 
-  Stage: Stage-3
+  Stage: Stage-7
       Create Table Operator:
         Create Table
           columns: key string, value string