You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/12/12 14:24:02 UTC

svn commit: r1644869 - in /hive/branches/spark/ql/src: java/org/apache/hadoop/hive/ql/optimizer/physical/ java/org/apache/hadoop/hive/ql/parse/spark/ test/results/clientpositive/spark/

Author: xuefu
Date: Fri Dec 12 13:24:01 2014
New Revision: 1644869

URL: http://svn.apache.org/r1644869
Log:
HIVE-8913: Make SparkMapJoinResolver handle runtime skew join [Spark Branch] (Rui via Xuefu)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenSparkSkewJoinProcessor.java Fri Dec 12 13:24:01 2014
@@ -296,9 +296,6 @@ public class GenSparkSkewJoinProcessor {
           path = bigKeysDirMap.get(tags[j]);
           bigKeysDirToTaskMap.put(path, skewJoinMapJoinTask);
           bigMapWork = mapWork;
-          // in MR, ReduceWork is a terminal work, but that's not the case for spark, therefore for
-          // big dir MapWork, we'll have to clone all dependent works in the original work graph
-          cloneWorkGraph(currentWork, sparkWork, reduceWork, mapWork);
         } else {
           path = smallTblDirs.get(tags[j]);
         }
@@ -379,6 +376,8 @@ public class GenSparkSkewJoinProcessor {
     dummyOp.setChildOperators(mapJoinChildren);
     bigMapWork.addDummyOp(dummyOp);
     MapJoinDesc mjDesc = mapJoinOp.getConf();
+    // mapjoin should not be affected by join reordering
+    mjDesc.resetOrder();
     SparkHashTableSinkDesc hashTableSinkDesc = new SparkHashTableSinkDesc(mjDesc);
     SparkHashTableSinkOperator hashTableSinkOp =
         (SparkHashTableSinkOperator) OperatorFactory.get(hashTableSinkDesc);
@@ -398,6 +397,7 @@ public class GenSparkSkewJoinProcessor {
         new ArrayList<Operator<? extends OperatorDesc>>();
     tableScanParents.add(tableScan);
     hashTableSinkOp.setParentOperators(tableScanParents);
+    hashTableSinkOp.setTag(tag);
   }
 
   private static void setMemUsage(MapJoinOperator mapJoinOp, Task<? extends Serializable> task,
@@ -412,8 +412,6 @@ public class GenSparkSkewJoinProcessor {
       return;
     }
     MapJoinDesc mapJoinDesc = mapJoinOp.getConf();
-    // mapjoin should not affected by join reordering
-    mapJoinDesc.resetOrder();
     HiveConf conf = context.getParseCtx().getConf();
     float hashtableMemoryUsage;
     if (context.isFollowedByGroupBy()) {
@@ -426,42 +424,17 @@ public class GenSparkSkewJoinProcessor {
     mapJoinDesc.setHashTableMemoryUsage(hashtableMemoryUsage);
   }
 
-  private static void cloneWorkGraph(SparkWork originSparkWork, SparkWork newSparkWork,
-      BaseWork originWork, BaseWork newWork) {
-    for (BaseWork child : originSparkWork.getChildren(originWork)) {
-      SparkEdgeProperty edgeProperty = originSparkWork.getEdgeProperty(originWork, child);
-      BaseWork cloneChild = Utilities.cloneBaseWork(child);
-      cloneChild.setName(cloneChild.getName().replaceAll("^([a-zA-Z]+)(\\s+)(\\d+)",
-          "$1$2" + GenSparkUtils.getUtils().getNextSeqNumber()));
-      newSparkWork.add(cloneChild);
-      newSparkWork.connect(newWork, cloneChild, edgeProperty);
-      cloneWorkGraph(originSparkWork, newSparkWork, child, cloneChild);
-    }
-  }
-
   /**
-   * ReduceWork is not terminal work in spark, so we disable runtime skew join for
-   * some complicated cases for now, leaving them to future tasks.
-   * As an example, consider the following spark work graph:
-   * M1  M5
-   * \   /
-   *  R2 (join)   M6
-   *    \         /
-   *     R3 (join)
-   *      |
-   *     R4 (group)
-   * If we create map join task for R2, we have to clone M6 as well so that the results
-   * get joined properly.
+   * Currently, we only support the simplest cases where join is the last work
+   * of a spark work, i.e. the current ReduceWork is a leave work
+   * If the reduce work has follow-up work, e.g. an aggregation following the join,
+   * it's difficult to union the results of the original join and conditional map join
+   * and feed that to the follow up works. This is not an issue for MR, where ReduceWork
+   * is always a terminal work.
    *
-   * Let's only support the case where downstream work of the current ReduceWork all
-   * have single parent.
+   * TODO: can we relax this
    */
   private static boolean supportRuntimeSkewJoin(SparkWork sparkWork, BaseWork work) {
-    for (BaseWork child : sparkWork.getChildren(work)) {
-      if (sparkWork.getParents(child).size() > 1 || !supportRuntimeSkewJoin(sparkWork, child)) {
-        return false;
-      }
-    }
-    return true;
+    return sparkWork.getChildren(work).isEmpty();
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkMapJoinResolver.java Fri Dec 12 13:24:01 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimi
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -31,6 +32,7 @@ import java.util.Stack;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
@@ -43,6 +45,9 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolver;
+import org.apache.hadoop.hive.ql.plan.ConditionalResolverSkewJoin;
+import org.apache.hadoop.hive.ql.plan.ConditionalWork;
 import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -244,7 +249,8 @@ public class SparkMapJoinResolver implem
     // all the parent SparkTasks that this new task is depend on, if they don't already exists.
     private SparkTask createSparkTask(SparkTask originalTask,
                                       SparkWork sparkWork,
-                                      Map<SparkWork, SparkTask> createdTaskMap) {
+                                      Map<SparkWork, SparkTask> createdTaskMap,
+                                      ConditionalTask conditionalTask) {
       if (createdTaskMap.containsKey(sparkWork)) {
         return createdTaskMap.get(sparkWork);
       }
@@ -252,19 +258,27 @@ public class SparkMapJoinResolver implem
           originalTask : (SparkTask) TaskFactory.get(sparkWork, physicalContext.conf);
       if (!dependencyGraph.get(sparkWork).isEmpty()) {
         for (SparkWork parentWork : dependencyGraph.get(sparkWork)) {
-          SparkTask parentTask = createSparkTask(originalTask, parentWork, createdTaskMap);
+          SparkTask parentTask =
+              createSparkTask(originalTask, parentWork, createdTaskMap, conditionalTask);
           parentTask.addDependentTask(resultTask);
         }
       } else {
         if (originalTask != resultTask) {
           List<Task<? extends Serializable>> parentTasks = originalTask.getParentTasks();
           if (parentTasks != null && parentTasks.size() > 0) {
+            // avoid concurrent modification
+            originalTask.setParentTasks(new ArrayList<Task<? extends Serializable>>());
             for (Task<? extends Serializable> parentTask : parentTasks) {
               parentTask.addDependentTask(resultTask);
+              parentTask.removeDependentTask(originalTask);
             }
           } else {
-            physicalContext.addToRootTask(resultTask);
-            physicalContext.removeFromRootTask(originalTask);
+            if (conditionalTask == null) {
+              physicalContext.addToRootTask(resultTask);
+              physicalContext.removeFromRootTask(originalTask);
+            } else {
+              updateConditionalTask(conditionalTask, originalTask, resultTask);
+            }
           }
         }
       }
@@ -277,36 +291,100 @@ public class SparkMapJoinResolver implem
     public Object dispatch(Node nd, Stack<Node> stack, Object... nos)
         throws SemanticException {
       Task<? extends Serializable> currentTask = (Task<? extends Serializable>) nd;
-      if (currentTask instanceof SparkTask) {
-        SparkTask sparkTask = (SparkTask) currentTask;
-        SparkWork sparkWork = sparkTask.getWork();
-
-        // Generate MapredLocalWorks for MJ and HTS
-        generateLocalWork(sparkTask);
-
-        dependencyGraph.put(sparkWork, new ArrayList<SparkWork>());
-        Set<BaseWork> leaves = sparkWork.getLeaves();
-        for (BaseWork leaf : leaves) {
-          moveWork(sparkWork, leaf, sparkWork);
-        }
-
-        // Now remove all BaseWorks in all the childSparkWorks that we created
-        // from the original SparkWork
-        for (SparkWork newSparkWork : sparkWorkMap.values()) {
-          for (BaseWork work : newSparkWork.getAllWorkUnsorted()) {
-            sparkWork.remove(work);
+      if(currentTask.isMapRedTask()) {
+        if (currentTask instanceof ConditionalTask) {
+          List<Task<? extends Serializable>> taskList =
+              ((ConditionalTask) currentTask).getListTasks();
+          for (Task<? extends Serializable> tsk : taskList) {
+            if (tsk instanceof SparkTask) {
+              processCurrentTask((SparkTask) tsk, (ConditionalTask) currentTask);
+            }
           }
+        } else if (currentTask instanceof SparkTask) {
+          processCurrentTask((SparkTask) currentTask, null);
         }
+      }
 
-        Map<SparkWork, SparkTask> createdTaskMap = new LinkedHashMap<SparkWork, SparkTask>();
+      return null;
+    }
 
-        // Now create SparkTasks from the SparkWorks, also set up dependency
-        for (SparkWork work : dependencyGraph.keySet()) {
-          createSparkTask(sparkTask, work, createdTaskMap);
+    /**
+     * @param sparkTask The current spark task we're processing.
+     * @param conditionalTask If conditional task is not null, it means the current task is
+     *                        wrapped in its task list.
+     */
+    private void processCurrentTask(SparkTask sparkTask, ConditionalTask conditionalTask) {
+      dependencyGraph.clear();
+      sparkWorkMap.clear();
+      SparkWork sparkWork = sparkTask.getWork();
+
+      // Generate MapredLocalWorks for MJ and HTS
+      generateLocalWork(sparkTask);
+
+      dependencyGraph.put(sparkWork, new ArrayList<SparkWork>());
+      Set<BaseWork> leaves = sparkWork.getLeaves();
+      for (BaseWork leaf : leaves) {
+        moveWork(sparkWork, leaf, sparkWork);
+      }
+
+      // Now remove all BaseWorks in all the childSparkWorks that we created
+      // from the original SparkWork
+      for (SparkWork newSparkWork : sparkWorkMap.values()) {
+        for (BaseWork work : newSparkWork.getAllWorkUnsorted()) {
+          sparkWork.remove(work);
         }
       }
 
-      return null;
+      Map<SparkWork, SparkTask> createdTaskMap = new LinkedHashMap<SparkWork, SparkTask>();
+
+      // Now create SparkTasks from the SparkWorks, also set up dependency
+      for (SparkWork work : dependencyGraph.keySet()) {
+        createSparkTask(sparkTask, work, createdTaskMap, conditionalTask);
+      }
+    }
+
+    /**
+     * Update the task/work list of this conditional task to replace originalTask with newTask.
+     * For runtime skew join, also update dirToTaskMap for the conditional resolver
+     */
+    private void updateConditionalTask(ConditionalTask conditionalTask,
+        SparkTask originalTask, SparkTask newTask) {
+      ConditionalWork conditionalWork = conditionalTask.getWork();
+      SparkWork originWork = originalTask.getWork();
+      SparkWork newWork = newTask.getWork();
+      List<Task<? extends Serializable>> listTask = conditionalTask.getListTasks();
+      List<Serializable> listWork = (List<Serializable>) conditionalWork.getListWorks();
+      int taskIndex = listTask.indexOf(originalTask);
+      int workIndex = listWork.indexOf(originWork);
+      if (taskIndex < 0 || workIndex < 0) {
+        return;
+      }
+      listTask.set(taskIndex, newTask);
+      listWork.set(workIndex, newWork);
+      ConditionalResolver resolver = conditionalTask.getResolver();
+      if (resolver instanceof ConditionalResolverSkewJoin) {
+        // get bigKeysDirToTaskMap
+        ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx context =
+            (ConditionalResolverSkewJoin.ConditionalResolverSkewJoinCtx) conditionalTask
+                .getResolverCtx();
+        HashMap<Path, Task<? extends Serializable>> bigKeysDirToTaskMap = context
+            .getDirToTaskMap();
+        // to avoid concurrent modify the hashmap
+        HashMap<Path, Task<? extends Serializable>> newbigKeysDirToTaskMap =
+            new HashMap<Path, Task<? extends Serializable>>();
+        // reset the resolver
+        for (Map.Entry<Path, Task<? extends Serializable>> entry :
+            bigKeysDirToTaskMap.entrySet()) {
+          Task<? extends Serializable> task = entry.getValue();
+          Path bigKeyDir = entry.getKey();
+          if (task.equals(originalTask)) {
+            newbigKeysDirToTaskMap.put(bigKeyDir, newTask);
+          } else {
+            newbigKeysDirToTaskMap.put(bigKeyDir, task);
+          }
+        }
+        context.setDirToTaskMap(newbigKeysDirToTaskMap);
+      }
     }
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Fri Dec 12 13:24:01 2014
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.ql.plan.Mo
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
 /**
  * SparkCompiler translates the operator plan into SparkTasks.
  *
@@ -272,6 +273,12 @@ public class SparkCompiler extends TaskC
 
     physicalCtx = new SplitSparkWorkResolver().resolve(physicalCtx);
 
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
+      (new SparkSkewJoinResolver()).resolve(physicalCtx);
+    } else {
+      LOG.debug("Skipping runtime skew join optimization");
+    }
+
     physicalCtx = new SparkMapJoinResolver().resolve(physicalCtx);
 
     if (conf.getBoolVar(HiveConf.ConfVars.HIVENULLSCANOPTIMIZE)) {
@@ -303,13 +310,6 @@ public class SparkCompiler extends TaskC
     } else {
       LOG.debug("Skipping stage id rearranger");
     }
-
-    if (conf.getBoolVar(HiveConf.ConfVars.HIVESKEWJOIN)) {
-      // TODO: enable after HIVE-8913 is done
-      //(new SparkSkewJoinResolver()).resolve(physicalCtx);
-    } else {
-      LOG.debug("Skipping runtime skew join optimization");
-    }
     return;
   }
 }

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join_stats.q.out Fri Dec 12 13:24:01 2014
@@ -182,8 +182,8 @@ POSTHOOK: query: explain select src1.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-1 depends on stages: Stage-2, Stage-3
   Stage-3 depends on stages: Stage-2
+  Stage-1 depends on stages: Stage-3
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
@@ -209,6 +209,28 @@ STAGE PLANS:
             Local Work:
               Map Reduce Local Work
 
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: src2
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                    Spark HashTable Sink Operator
+                      condition expressions:
+                        0 {key}
+                        1 
+                      keys:
+                        0 key (type: string)
+                        1 key (type: string)
+            Local Work:
+              Map Reduce Local Work
+
   Stage: Stage-1
     Spark
       Edges:
@@ -301,28 +323,6 @@ STAGE PLANS:
                             output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
-  Stage: Stage-3
-    Spark
-#### A masked pattern was here ####
-      Vertices:
-        Map 3 
-            Map Operator Tree:
-                TableScan
-                  alias: src2
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: key is not null (type: boolean)
-                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                    Spark HashTable Sink Operator
-                      condition expressions:
-                        0 {key}
-                        1 
-                      keys:
-                        0 key (type: string)
-                        1 key (type: string)
-            Local Work:
-              Map Reduce Local Work
-
   Stage: Stage-0
     Fetch Operator
       limit: -1

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket_map_join_tez1.q.out Fri Dec 12 13:24:01 2014
@@ -314,8 +314,8 @@ join tab b on a.k1 = b.key
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-1 depends on stages: Stage-2, Stage-3
   Stage-3 depends on stages: Stage-2
+  Stage-1 depends on stages: Stage-3
   Stage-0 depends on stages: Stage-1
 
 STAGE PLANS:
@@ -341,6 +341,28 @@ STAGE PLANS:
             Local Work:
               Map Reduce Local Work
 
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 3 
+            Map Operator Tree:
+                TableScan
+                  alias: tab
+                  Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
+                  Filter Operator
+                    predicate: key is not null (type: boolean)
+                    Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
+                    Spark HashTable Sink Operator
+                      condition expressions:
+                        0 
+                        1 {value}
+                      keys:
+                        0 key (type: int)
+                        1 key (type: int)
+            Local Work:
+              Map Reduce Local Work
+
   Stage: Stage-1
     Spark
       Edges:
@@ -421,28 +443,6 @@ STAGE PLANS:
                             output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
 
-  Stage: Stage-3
-    Spark
-#### A masked pattern was here ####
-      Vertices:
-        Map 3 
-            Map Operator Tree:
-                TableScan
-                  alias: tab
-                  Statistics: Num rows: 242 Data size: 2566 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: key is not null (type: boolean)
-                    Statistics: Num rows: 121 Data size: 1283 Basic stats: COMPLETE Column stats: NONE
-                    Spark HashTable Sink Operator
-                      condition expressions:
-                        0 
-                        1 {value}
-                      keys:
-                        0 key (type: int)
-                        1 key (type: int)
-            Local Work:
-              Map Reduce Local Work
-
   Stage: Stage-0
     Fetch Operator
       limit: -1

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out?rev=1644869&r1=1644868&r2=1644869&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/skewjoin.q.out Fri Dec 12 13:24:01 2014
@@ -80,7 +80,10 @@ INSERT OVERWRITE TABLE dest_j1 SELECT sr
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-1 is a root stage
-  Stage-0 depends on stages: Stage-1
+  Stage-4 depends on stages: Stage-1 , consists of Stage-5, Stage-0
+  Stage-5
+  Stage-3 depends on stages: Stage-5
+  Stage-0 depends on stages: Stage-3
   Stage-2 depends on stages: Stage-0
 
 STAGE PLANS:
@@ -125,6 +128,7 @@ STAGE PLANS:
                 condition expressions:
                   0 {KEY.reducesinkkey0}
                   1 {VALUE._col0}
+                handleSkewJoin: true
                 outputColumnNames: _col0, _col6
                 Statistics: Num rows: 275 Data size: 2921 Basic stats: COMPLETE Column stats: NONE
                 Select Operator
@@ -140,6 +144,56 @@ STAGE PLANS:
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest_j1
 
+  Stage: Stage-4
+    Conditional Operator
+
+  Stage: Stage-5
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 5 
+            Map Operator Tree:
+                TableScan
+                  Spark HashTable Sink Operator
+                    condition expressions:
+                      0 {0_VALUE_0}
+                      1 {1_VALUE_0}
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+            Local Work:
+              Map Reduce Local Work
+
+  Stage: Stage-3
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 4 
+            Map Operator Tree:
+                TableScan
+                  Map Join Operator
+                    condition map:
+                         Inner Join 0 to 1
+                    condition expressions:
+                      0 {0_VALUE_0}
+                      1 {1_VALUE_0}
+                    keys:
+                      0 reducesinkkey0 (type: string)
+                      1 reducesinkkey0 (type: string)
+                    outputColumnNames: _col0, _col6
+                    Select Operator
+                      expressions: UDFToInteger(_col0) (type: int), _col6 (type: string)
+                      outputColumnNames: _col0, _col1
+                      File Output Operator
+                        compressed: false
+                        table:
+                            input format: org.apache.hadoop.mapred.TextInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                            name: default.dest_j1
+            Local Work:
+              Map Reduce Local Work
+
   Stage: Stage-0
     Move Operator
       tables: