You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/21 04:45:05 UTC

svn commit: r1633268 [1/7] - in /hive/branches/spark: itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ ql/src/java/org/apache/hadoop/hive/ql/parse/spark/ ql/src/java/org/apache/had...

Author: xuefu
Date: Tue Oct 21 02:45:04 2014
New Revision: 1633268

URL: http://svn.apache.org/r1633268
Log:
HIVE-8436: Modify SparkWork to split works with multiple child works [Spark Branch] (Chao via Xuefu)

Added:
    hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q
    hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert_mixed.q.out
Removed:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java
Modified:
    hive/branches/spark/itests/src/test/resources/testconfiguration.properties
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
    hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_cube1.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_multi_single_reducer.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_position.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_rollup1.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_sort_1_23.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby_sort_skew_1_23.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/input12.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/input13.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/input1_limit.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/input_part2.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/insert_into3.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/load_dyn_part1.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/load_dyn_part8.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert_gby3.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert_lateral_view.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/multi_insert_move_tasks_share_dependencies.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/multigroupby_singlemr.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/ppd_multi_insert.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/ppd_transform.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/subquery_multiinsert.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/vectorized_ptf.q.out

Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Tue Oct 21 02:45:04 2014
@@ -505,6 +505,7 @@ spark.query.files=add_part_multiple.q \
   multi_insert_gby2.q \
   multi_insert_gby3.q \
   multi_insert_lateral_view.q \
+  multi_insert_mixed.q \
   multi_insert_move_tasks_share_dependencies.q \
   multigroupby_singlemr.q \
   optimize_nullscan.q \

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Oct 21 02:45:04 2014
@@ -972,6 +972,23 @@ public final class Utilities {
   }
 
   /**
+   * Clones using the powers of XML. Do not use unless necessary.
+   * @param plan The plan.
+   * @return The clone.
+   */
+  public static BaseWork cloneBaseWork(BaseWork plan) {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.CLONE_PLAN);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
+    Configuration conf = new HiveConf();
+    serializePlan(plan, baos, conf, true);
+    BaseWork newPlan = deserializePlan(new ByteArrayInputStream(baos.toByteArray()),
+        plan.getClass(), conf, true);
+    perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.CLONE_PLAN);
+    return newPlan;
+  }
+
+  /**
    * Serialize the object. This helper function mainly makes sure that enums,
    * counters, etc are handled properly.
    */

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveBaseFunctionResultList.java Tue Oct 21 02:45:04 2014
@@ -63,7 +63,7 @@ public abstract class HiveBaseFunctionRe
 
   @Override
   public void collect(HiveKey key, BytesWritable value) throws IOException {
-    lastRecordOutput.add(copyHiveKey(key), copyBytesWritable(value));
+    lastRecordOutput.add(copyHiveKey(key), SparkUtilities.copyBytesWritable(value));
   }
 
   private static HiveKey copyHiveKey(HiveKey key) {
@@ -74,12 +74,6 @@ public abstract class HiveBaseFunctionRe
     return copy;
   }
 
-  private static BytesWritable copyBytesWritable(BytesWritable bw) {
-    BytesWritable copy = new BytesWritable();
-    copy.set(bw);
-    return copy;
-  }
-
   /** Process the given record. */
   protected abstract void processNextRecord(T inputRecord) throws IOException;
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Tue Oct 21 02:45:04 2014
@@ -20,13 +20,9 @@ package org.apache.hadoop.hive.ql.exec.s
 
 import java.util.Iterator;
 
-import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.spark.TaskContext;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
 
 import scala.Tuple2;
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Tue Oct 21 02:45:04 2014
@@ -19,10 +19,13 @@
 package org.apache.hadoop.hive.ql.exec.spark;
 
 import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
 import com.google.common.base.Preconditions;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
 
 public class MapInput implements SparkTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> {
   private JavaPairRDD<HiveKey, BytesWritable> hadoopRDD;
@@ -46,7 +49,24 @@ public class MapInput implements SparkTr
       JavaPairRDD<BytesWritable, BytesWritable> input) {
     Preconditions.checkArgument(input == null,
         "AssertionError: MapInput doesn't take any input");
-    return toCache ? hadoopRDD.cache() : hadoopRDD;
+    JavaPairRDD result = hadoopRDD;
+    if (toCache) {
+      result = result.mapToPair(new CopyFunction());
+      return result.cache();
+    } else {
+      return result;
+    }
+  }
+
+  private static class CopyFunction implements PairFunction<Tuple2<BytesWritable, BytesWritable>,
+        BytesWritable, BytesWritable> {
+
+    @Override
+    public Tuple2<BytesWritable, BytesWritable> call(Tuple2<BytesWritable, BytesWritable> tup) throws Exception {
+      // no need to copy key since it never get used in HiveMapFunction
+      BytesWritable value = SparkUtilities.copyBytesWritable(tup._2());
+      return new Tuple2<BytesWritable, BytesWritable>(tup._1(), value);
+    }
   }
 
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Tue Oct 21 02:45:04 2014
@@ -18,15 +18,23 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
@@ -46,7 +54,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
-import org.apache.hadoop.hive.ql.plan.UnionWork;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -64,6 +71,9 @@ public class SparkPlanGenerator {
   private final JobConf jobConf;
   private Context context;
   private Path scratchDir;
+  private final Map<BaseWork, BaseWork> cloneToWork;
+  private final Map<BaseWork, SparkTran> workToTranMap;
+  private final Map<BaseWork, SparkTran> workToParentWorkTranMap;
 
   public SparkPlanGenerator(JavaSparkContext sc, Context context,
       JobConf jobConf, Path scratchDir) {
@@ -71,31 +81,31 @@ public class SparkPlanGenerator {
     this.context = context;
     this.jobConf = jobConf;
     this.scratchDir = scratchDir;
+    this.cloneToWork = new HashMap<BaseWork, BaseWork>();
+    this.workToTranMap = new HashMap<BaseWork, SparkTran>();
+    this.workToParentWorkTranMap = new HashMap<BaseWork, SparkTran>();
   }
 
   public SparkPlan generate(SparkWork sparkWork) throws Exception {
     SparkPlan sparkPlan = new SparkPlan();
-    Map<BaseWork, SparkTran> workToTranMap = new HashMap<BaseWork, SparkTran>();
+    cloneToWork.clear();
+    workToTranMap.clear();
+    workToParentWorkTranMap.clear();
+
+    splitSparkWork(sparkWork);
 
     for (BaseWork work : sparkWork.getAllWork()) {
       SparkTran tran;
       if (work instanceof MapWork) {
-        MapInput mapInput = generateMapInput((MapWork)work);
-        sparkPlan.addTran(mapInput);
+        SparkTran mapInput = generateParentTran(sparkPlan, sparkWork, work);
         tran = generate((MapWork)work);
         sparkPlan.addTran(tran);
         sparkPlan.connect(mapInput, tran);
       } else if (work instanceof ReduceWork) {
-        List<BaseWork> parentWorks = sparkWork.getParents(work);
+        SparkTran shuffleTran = generateParentTran(sparkPlan, sparkWork, work);
         tran = generate((ReduceWork)work);
         sparkPlan.addTran(tran);
-        ShuffleTran shuffleTran = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work));
-        sparkPlan.addTran(shuffleTran);
         sparkPlan.connect(shuffleTran, tran);
-        for (BaseWork parentWork : parentWorks) {
-          SparkTran parentTran = workToTranMap.get(parentWork);
-          sparkPlan.connect(parentTran, shuffleTran);
-        }
       } else {
         List<BaseWork> parentWorks = sparkWork.getParents(work);
         tran = new IdentityTran();
@@ -105,12 +115,144 @@ public class SparkPlanGenerator {
           sparkPlan.connect(parentTran, tran);
         }
       }
+
       workToTranMap.put(work, tran);
     }
 
     return sparkPlan;
   }
 
+  // Generate (possibly get from a cached result) parent SparkTran
+  private SparkTran generateParentTran(SparkPlan sparkPlan, SparkWork sparkWork, BaseWork work) throws Exception {
+    if (cloneToWork.containsKey(work)) {
+      BaseWork originalWork = cloneToWork.get(work);
+      if (workToParentWorkTranMap.containsKey(originalWork)) {
+        return workToParentWorkTranMap.get(originalWork);
+      }
+    }
+
+    SparkTran result;
+    if (work instanceof MapWork) {
+      result = generateMapInput((MapWork)work);
+      sparkPlan.addTran(result);
+    } else if (work instanceof ReduceWork) {
+      List<BaseWork> parentWorks = sparkWork.getParents(work);
+      result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work));
+      sparkPlan.addTran(result);
+      for (BaseWork parentWork : parentWorks) {
+        sparkPlan.connect(workToTranMap.get(parentWork), result);
+      }
+    } else {
+      throw new IllegalStateException("AssertionError: generateParentTran() only expect MapWork or ReduceWork," +
+          " but found " + work.getClass().getName());
+    }
+
+    if (cloneToWork.containsKey(work)) {
+      workToParentWorkTranMap.put(cloneToWork.get(work), result);
+    }
+
+    return result;
+  }
+
+
+  private void splitSparkWork(SparkWork sparkWork) {
+    // do a BFS on the sparkWork graph, and look for any work that has more than one child.
+    // If we found such a work, we split it into multiple ones, one for each of its child.
+    Queue<BaseWork> queue = new LinkedList<BaseWork>();
+    Set<BaseWork> visited = new HashSet<BaseWork>();
+    queue.addAll(sparkWork.getRoots());
+    while (!queue.isEmpty()) {
+      BaseWork work = queue.poll();
+      if (!visited.add(work)) {
+        continue;
+      }
+
+      List<BaseWork> childWorks = sparkWork.getChildren(work);
+      // First, add all children of this work into queue, to be processed later.
+      for (BaseWork w : childWorks) {
+        queue.add(w);
+      }
+
+      // Second, check if this work has multiple reduceSinks. If so, do split.
+      splitBaseWork(sparkWork, work, childWorks);
+    }
+  }
+
+  private Set<Operator<?>> getAllReduceSinks(BaseWork work) {
+    Set<Operator<?>> resultSet = work.getAllLeafOperators();
+    Iterator<Operator<?>> it = resultSet.iterator();
+    while (it.hasNext()) {
+      if (!(it.next() instanceof ReduceSinkOperator)) {
+        it.remove();
+      }
+    }
+    return resultSet;
+  }
+
+  // Split work into multiple branches, one for each childWork in childWorks.
+  // It also set up the connection between each parent work and child work.
+  private void splitBaseWork(SparkWork sparkWork, BaseWork parentWork, List<BaseWork> childWorks) {
+    if (getAllReduceSinks(parentWork).size() <= 1) {
+      return;
+    }
+
+    // Grand-parent works - we need to set these to be the parents of the cloned works.
+    List<BaseWork> grandParentWorks = sparkWork.getParents(parentWork);
+    boolean isFirst = true;
+
+    for (BaseWork childWork : childWorks) {
+      BaseWork clonedParentWork = Utilities.cloneBaseWork(parentWork);
+      String childReducerName = childWork.getName();
+      SparkEdgeProperty clonedEdgeProperty = sparkWork.getEdgeProperty(parentWork, childWork);
+
+      // We need to remove those branches that
+      // 1, ended with a ReduceSinkOperator, and
+      // 2, the ReduceSinkOperator's name is not the same as childReducerName.
+      // Also, if the cloned work is not the first, we remove ALL leaf operators except
+      // the corresponding ReduceSinkOperator.
+      for (Operator<?> op : clonedParentWork.getAllLeafOperators()) {
+        if (op instanceof ReduceSinkOperator) {
+          if (!((ReduceSinkOperator)op).getConf().getOutputName().equals(childReducerName)) {
+            removeOpRecursive(op);
+          }
+        } else if (!isFirst) {
+          removeOpRecursive(op);
+        }
+      }
+
+      isFirst = false;
+
+      // Then, we need to set up the graph connection. Especially:
+      // 1, we need to connect this cloned parent work with all the grand-parent works.
+      // 2, we need to connect this cloned parent work with the corresponding child work.
+      sparkWork.add(clonedParentWork);
+      for (BaseWork gpw : grandParentWorks) {
+        sparkWork.connect(gpw, clonedParentWork, sparkWork.getEdgeProperty(gpw, parentWork));
+      }
+      sparkWork.connect(clonedParentWork, childWork, clonedEdgeProperty);
+      cloneToWork.put(clonedParentWork, parentWork);
+    }
+
+    sparkWork.remove(parentWork);
+  }
+
+  // Remove op from all its parents' child list.
+  // Recursively remove any of its parent who only have this op as child.
+  private void removeOpRecursive(Operator<?> operator) {
+    List<Operator<?>> parentOperators = new ArrayList<Operator<?>>();
+    for (Operator<?> op : operator.getParentOperators()) {
+      parentOperators.add(op);
+    }
+    for (Operator<?> parentOperator : parentOperators) {
+      Preconditions.checkArgument(parentOperator.getChildOperators().contains(operator),
+          "AssertionError: parent of " + operator.getName() + " doesn't have it as child.");
+      parentOperator.removeChild(operator);
+      if (parentOperator.getNumChild() == 0) {
+        removeOpRecursive(parentOperator);
+      }
+    }
+  }
+
   private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
     // MergeFileWork is sub-class of MapWork, we don't need to distinguish here
     if (mWork.getInputformat() != null) {
@@ -147,10 +289,10 @@ public class SparkPlanGenerator {
 
     JavaPairRDD<HiveKey, BytesWritable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
         WritableComparable.class, Writable.class);
-    return new MapInput(hadoopRDD);
+    return new MapInput(hadoopRDD, false /*TODO: fix this after resolving HIVE-8457: cloneToWork.containsKey(mapWork)*/);
   }
 
-  private ShuffleTran generate(SparkEdgeProperty edge) {
+  private ShuffleTran generate(SparkEdgeProperty edge, boolean needCache) {
     Preconditions.checkArgument(!edge.isShuffleNone(),
         "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
     SparkShuffler shuffler;
@@ -161,7 +303,7 @@ public class SparkPlanGenerator {
     } else {
       shuffler = new GroupByShuffler();
     }
-    return new ShuffleTran(shuffler, edge.getNumPartitions());
+    return new ShuffleTran(shuffler, edge.getNumPartitions(), needCache);
   }
 
   private MapTran generate(MapWork mw) throws Exception {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java Tue Oct 21 02:45:04 2014
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.exec.spark;
 
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.TaskContext;
 
@@ -33,4 +34,11 @@ public class SparkUtilities {
     jobConf.set("mapred.task.id",
         String.format("%06d_%02d", taskContext.getPartitionId(), taskContext.getAttemptId()));
   }
+
+  public static BytesWritable copyBytesWritable(BytesWritable bw) {
+    BytesWritable copy = new BytesWritable();
+    copy.set(bw);
+    return copy;
+  }
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Tue Oct 21 02:45:04 2014
@@ -77,10 +77,6 @@ public class GenSparkProcContext impleme
   // walk.
   public Operator<? extends OperatorDesc> parentOfRoot;
 
-  // Default task is the task we use for those operators that are not connected
-  // to the newly generated TS
-  public SparkTask defaultTask;
-
   // Spark task we're currently processing
   public SparkTask currentTask;
 
@@ -88,20 +84,6 @@ public class GenSparkProcContext impleme
   // one.
   public BaseWork preceedingWork;
 
-  // All operators that we should unlink with their parents, for multi-table insertion
-  // It's a mapping from operator to its ONLY parent.
-  public Map<Operator<?>, Operator<?>> opToParentMap;
-
-  // A mapping from operators to their corresponding tasks.
-  // The key for this map could only be:
-  //  1. TableScanOperators (so we know which task for the tree rooted at this TS)
-  //  2. FileSinkOperators (need this info in GenSparkUtils::processFileSinks)
-  //  3. UnionOperator/JoinOperator (need for merging tasks)
-  public final Map<Operator<?>, SparkTask> opToTaskMap;
-
-  // temporary TS generated for multi-table insertion
-  public final Set<TableScanOperator> tempTS;
-
   // map that keeps track of the last operator of a task to the work
   // that follows it. This is used for connecting them later.
   public final Map<Operator<?>, BaseWork> leafOperatorToFollowingWork;
@@ -157,10 +139,9 @@ public class GenSparkProcContext impleme
     this.rootTasks = rootTasks;
     this.inputs = inputs;
     this.outputs = outputs;
-    this.defaultTask = (SparkTask) TaskFactory.get(
+    this.currentTask = (SparkTask) TaskFactory.get(
         new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
-    this.rootTasks.add(defaultTask);
-    this.currentTask = null;
+    this.rootTasks.add(currentTask);
     this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>();
     this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, SparkEdgeProperty>>();
     this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>();
@@ -178,8 +159,5 @@ public class GenSparkProcContext impleme
     this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
     this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
     this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
-    this.opToParentMap = new LinkedHashMap<Operator<?>, Operator<?>>();
-    this.opToTaskMap = new LinkedHashMap<Operator<?>, SparkTask>();
-    this.tempTS = new LinkedHashSet<TableScanOperator>();
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Tue Oct 21 02:45:04 2014
@@ -170,16 +170,6 @@ public class GenSparkUtils {
     return mapWork;
   }
 
-  // Create a MapWork for a temporary TableScanOperator
-  // Basically a thin wrapper on GenMapRedUtils.setTaskPlan.
-  public MapWork createMapWork(TableScanOperator root,
-                               SparkWork sparkWork, String path, TableDesc tt_desc) throws SemanticException {
-    MapWork mapWork = new MapWork("Map " + (++sequenceNumber));
-    GenMapRedUtils.setTaskPlan(path, path, root, mapWork, false, tt_desc);
-    sparkWork.add(mapWork);
-    return mapWork;
-  }
-
   // this method's main use is to help unit testing this class
   protected void setupMapWork(MapWork mapWork, GenSparkProcContext context,
       PrunedPartitionList partitions, Operator<? extends OperatorDesc> root,
@@ -274,19 +264,15 @@ public class GenSparkUtils {
       throws SemanticException {
 
     ParseContext parseContext = context.parseContext;
-    Preconditions.checkArgument(context.opToTaskMap.containsKey(fileSink),
-        "AssertionError: the fileSink " + fileSink.getName() + " should be in the context");
-
-    SparkTask currentTask = context.opToTaskMap.get(fileSink);
 
     boolean isInsertTable = // is INSERT OVERWRITE TABLE
         GenMapRedUtils.isInsertInto(parseContext, fileSink);
     HiveConf hconf = parseContext.getConf();
 
     boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
-        hconf, fileSink, currentTask, isInsertTable);
+        hconf, fileSink, context.currentTask, isInsertTable);
 
-    Path finalName = GenMapRedUtils.createMoveTask(currentTask,
+    Path finalName = GenMapRedUtils.createMoveTask(context.currentTask,
         chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
 
     if (chDir) {
@@ -295,13 +281,13 @@ public class GenSparkUtils {
       logger.info("using CombineHiveInputformat for the merge job");
       GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
           context.dependencyTask, context.moveTask,
-          hconf, currentTask);
+          hconf, context.currentTask);
     }
 
     FetchTask fetchTask = parseContext.getFetchTask();
-    if (fetchTask != null && currentTask.getNumChild() == 0) {
+    if (fetchTask != null && context.currentTask.getNumChild() == 0) {
       if (fetchTask.isFetchFrom(fileSink.getConf())) {
-        currentTask.setFetchSource(true);
+        context.currentTask.setFetchSource(true);
       }
     }
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Tue Oct 21 02:45:04 2014
@@ -90,10 +90,6 @@ public class GenSparkWork implements Nod
       return null;
     }
 
-    if (operator instanceof FileSinkOperator) {
-      context.opToTaskMap.put(operator, context.currentTask);
-    }
-
     SparkWork sparkWork = context.currentTask.getWork();
 
     // Right now the work graph is pretty simple. If there is no

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Tue Oct 21 02:45:04 2014
@@ -34,7 +34,6 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -139,39 +138,9 @@ public class SparkCompiler extends TaskC
     GenSparkProcContext procCtx = new GenSparkProcContext(
         conf, tempParseContext, mvTask, rootTasks, inputs, outputs);
 
-    // -------------------- First Pass ---------------------
-
-    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"),
-        new SparkTableScanProcessor());
-
-    Dispatcher disp = new DefaultRuleDispatcher(new SparkMultiInsertionProcessor(), opRules, procCtx);
-    ArrayList<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pCtx.getTopOps().values());
-    GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
-    ogw.startWalking(topNodes, null);
-
-    // ------------------- Second Pass ----------------------
-
-    // Merge tasks upon Join/Union if possible
-    opRules.clear();
-    opRules.put(new RuleRegExp("Join", JoinOperator.getOperatorName() + "%"),
-        new SparkMergeTaskProcessor());
-    opRules.put(new RuleRegExp("Union", UnionOperator.getOperatorName() + "%"),
-        new SparkMergeTaskProcessor());
-    disp = new DefaultRuleDispatcher(null, opRules, procCtx);
-    topNodes = new ArrayList<Node>();
-    topNodes.addAll(procCtx.tempTS); // First process temp TS
-    topNodes.addAll(pCtx.getTopOps().values());
-    ogw = new GenSparkWorkWalker(disp, procCtx);
-    ogw.startWalking(topNodes, null);
-
-
-    // ------------------- Third Pass -----------------------
-
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack. The dispatcher generates the plan from the operator tree
-    opRules.clear();
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
     opRules.put(new RuleRegExp("Split Work - ReduceSink",
         ReduceSinkOperator.getOperatorName() + "%"), genSparkWork);
 
@@ -184,17 +153,7 @@ public class SparkCompiler extends TaskC
 
     opRules.put(new RuleRegExp("Handle Analyze Command",
         TableScanOperator.getOperatorName() + "%"),
-        new CompositeProcessor(
-            new NodeProcessor() {
-              @Override
-              public Object process(Node nd, Stack<Node> s,
-                                    NodeProcessorCtx procCtx, Object... no) throws SemanticException {
-                GenSparkProcContext context = (GenSparkProcContext) procCtx;
-                context.currentTask = context.opToTaskMap.get(nd);
-                return null;
-              }
-            },
-            new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())));
+        new SparkProcessAnalyzeTable(GenSparkUtils.getUtils()));
 
     opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"),
         new NodeProcessor() {
@@ -213,11 +172,10 @@ public class SparkCompiler extends TaskC
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
-    disp = new DefaultRuleDispatcher(null, opRules, procCtx);
-    topNodes = new ArrayList<Node>();
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pCtx.getTopOps().values());
-    topNodes.addAll(procCtx.tempTS);
-    ogw = new GenSparkWorkWalker(disp, procCtx);
+    GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
     ogw.startWalking(topNodes, null);
 
     // we need to clone some operator plans and remove union operators still

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java Tue Oct 21 02:45:04 2014
@@ -79,10 +79,6 @@ public class SparkProcessAnalyzeTable im
     GenSparkProcContext context = (GenSparkProcContext) procContext;
 
     TableScanOperator tableScan = (TableScanOperator) nd;
-    // If this tableScan is a generated one for multi-insertion, ignore it
-    if (context.tempTS.contains(tableScan)) {
-      return null;
-    }
 
     ParseContext parseContext = context.parseContext;
     Class<? extends InputFormat> inputFormat = parseContext.getTopToTable().get(tableScan)

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Tue Oct 21 02:45:04 2014
@@ -113,6 +113,31 @@ public abstract class BaseWork extends A
     return returnSet;
   }
 
+  /**
+   * Returns a set containing all leaf operators from the operator tree in this work.
+   * @return a set containing all leaf operators in this operator tree.
+   */
+  public Set<Operator<?>> getAllLeafOperators() {
+    Set<Operator<?>> returnSet = new LinkedHashSet<Operator<?>>();
+    Set<Operator<?>> opSet = getAllRootOperators();
+    Stack<Operator<?>> opStack = new Stack<Operator<?>>();
+
+    // add all children
+    opStack.addAll(opSet);
+
+    while(!opStack.empty()) {
+      Operator<?> op = opStack.pop();
+      if (op.getNumChild() == 0) {
+        returnSet.add(op);
+      }
+      if (op.getChildOperators() != null) {
+        opStack.addAll(op.getChildOperators());
+      }
+    }
+
+    return returnSet;
+  }
+
   public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
     return scratchColumnVectorTypes;
   }

Added: hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q?rev=1633268&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q (added)
+++ hive/branches/spark/ql/src/test/queries/clientpositive/multi_insert_mixed.q Tue Oct 21 02:45:04 2014
@@ -0,0 +1,21 @@
+create table src_multi1 like src;
+create table src_multi2 like src;
+create table src_multi3 like src;
+
+-- Testing the case where a map work contains both shuffling (ReduceSinkOperator)
+-- and inserting to output table (FileSinkOperator).
+
+explain
+from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10;
+
+from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10;
+
+select * from src_multi1;
+select * from src_multi2;
+select * from src_multi3;

Added: hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out?rev=1633268&view=auto
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out (added)
+++ hive/branches/spark/ql/src/test/results/clientpositive/multi_insert_mixed.q.out Tue Oct 21 02:45:04 2014
@@ -0,0 +1,915 @@
+PREHOOK: query: create table src_multi1 like src
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@src_multi1
+POSTHOOK: query: create table src_multi1 like src
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@src_multi1
+PREHOOK: query: create table src_multi2 like src
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@src_multi2
+POSTHOOK: query: create table src_multi2 like src
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@src_multi2
+PREHOOK: query: create table src_multi3 like src
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@src_multi3
+POSTHOOK: query: create table src_multi3 like src
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@src_multi3
+PREHOOK: query: -- Testing the case where a map work contains both shuffling (ReduceSinkOperator)
+-- and inserting to output table (FileSinkOperator).
+
+explain
+from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10
+PREHOOK: type: QUERY
+POSTHOOK: query: -- Testing the case where a map work contains both shuffling (ReduceSinkOperator)
+-- and inserting to output table (FileSinkOperator).
+
+explain
+from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-3 is a root stage
+  Stage-4 depends on stages: Stage-3
+  Stage-0 depends on stages: Stage-4
+  Stage-5 depends on stages: Stage-0
+  Stage-6 depends on stages: Stage-3
+  Stage-7 depends on stages: Stage-6
+  Stage-1 depends on stages: Stage-7
+  Stage-8 depends on stages: Stage-1
+  Stage-2 depends on stages: Stage-7
+  Stage-9 depends on stages: Stage-2
+
+STAGE PLANS:
+  Stage: Stage-3
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            alias: src
+            Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+            Select Operator
+              expressions: key (type: string)
+              outputColumnNames: key
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+              Group By Operator
+                aggregations: count(1)
+                keys: key (type: string)
+                mode: hash
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                Reduce Output Operator
+                  key expressions: _col0 (type: string)
+                  sort order: +
+                  Map-reduce partition columns: _col0 (type: string)
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                  value expressions: _col1 (type: bigint)
+            Select Operator
+              expressions: value (type: string)
+              outputColumnNames: value
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+              Group By Operator
+                aggregations: count(1)
+                keys: value (type: string)
+                mode: hash
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  table:
+                      input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+            Filter Operator
+              predicate: (key < 10) (type: boolean)
+              Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+              Select Operator
+                expressions: key (type: string), value (type: string)
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+                File Output Operator
+                  compressed: false
+                  Statistics: Num rows: 166 Data size: 1763 Basic stats: COMPLETE Column stats: NONE
+                  table:
+                      input format: org.apache.hadoop.mapred.TextInputFormat
+                      output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                      serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                      name: default.src_multi3
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations: count(VALUE._col0)
+          keys: KEY._col0 (type: string)
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+          Select Operator
+            expressions: _col0 (type: string), _col1 (type: bigint)
+            outputColumnNames: _col0, _col1
+            Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+            File Output Operator
+              compressed: false
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-4
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            Reduce Output Operator
+              key expressions: _col0 (type: string)
+              sort order: +
+              Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+              value expressions: _col1 (type: bigint)
+      Reduce Operator Tree:
+        Select Operator
+          expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: bigint)
+          outputColumnNames: _col0, _col1
+          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+          File Output Operator
+            compressed: false
+            Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+            table:
+                input format: org.apache.hadoop.mapred.TextInputFormat
+                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                name: default.src_multi1
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.src_multi1
+
+  Stage: Stage-5
+    Stats-Aggr Operator
+
+  Stage: Stage-6
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            Reduce Output Operator
+              key expressions: _col0 (type: string)
+              sort order: +
+              Map-reduce partition columns: _col0 (type: string)
+              Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+              value expressions: _col1 (type: bigint)
+      Reduce Operator Tree:
+        Group By Operator
+          aggregations: count(VALUE._col0)
+          keys: KEY._col0 (type: string)
+          mode: mergepartial
+          outputColumnNames: _col0, _col1
+          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+          Select Operator
+            expressions: _col0 (type: string), _col1 (type: bigint)
+            outputColumnNames: _col0, _col1
+            Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+            File Output Operator
+              compressed: false
+              table:
+                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-7
+    Map Reduce
+      Map Operator Tree:
+          TableScan
+            Reduce Output Operator
+              key expressions: _col0 (type: string)
+              sort order: +
+              Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+              value expressions: _col1 (type: bigint)
+      Reduce Operator Tree:
+        Select Operator
+          expressions: KEY.reducesinkkey0 (type: string), VALUE._col0 (type: bigint)
+          outputColumnNames: _col0, _col1
+          Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+          File Output Operator
+            compressed: false
+            Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+            table:
+                input format: org.apache.hadoop.mapred.TextInputFormat
+                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                name: default.src_multi2
+
+  Stage: Stage-1
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.src_multi2
+
+  Stage: Stage-8
+    Stats-Aggr Operator
+
+  Stage: Stage-2
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.src_multi3
+
+  Stage: Stage-9
+    Stats-Aggr Operator
+
+PREHOOK: query: from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@src_multi1
+PREHOOK: Output: default@src_multi2
+PREHOOK: Output: default@src_multi3
+POSTHOOK: query: from src
+insert overwrite table src_multi1 select key, count(1) group by key order by key
+insert overwrite table src_multi2 select value, count(1) group by value order by value
+insert overwrite table src_multi3 select * where key < 10
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@src_multi1
+POSTHOOK: Output: default@src_multi2
+POSTHOOK: Output: default@src_multi3
+POSTHOOK: Lineage: src_multi1.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src_multi1.value EXPRESSION [(src)src.null, ]
+POSTHOOK: Lineage: src_multi2.key SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: src_multi2.value EXPRESSION [(src)src.null, ]
+POSTHOOK: Lineage: src_multi3.key SIMPLE [(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Lineage: src_multi3.value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select * from src_multi1
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src_multi1
+#### A masked pattern was here ####
+POSTHOOK: query: select * from src_multi1
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src_multi1
+#### A masked pattern was here ####
+0	3
+10	1
+100	2
+103	2
+104	2
+105	1
+11	1
+111	1
+113	2
+114	1
+116	1
+118	2
+119	3
+12	2
+120	2
+125	2
+126	1
+128	3
+129	2
+131	1
+133	1
+134	2
+136	1
+137	2
+138	4
+143	1
+145	1
+146	2
+149	2
+15	2
+150	1
+152	2
+153	1
+155	1
+156	1
+157	1
+158	1
+160	1
+162	1
+163	1
+164	2
+165	2
+166	1
+167	3
+168	1
+169	4
+17	1
+170	1
+172	2
+174	2
+175	2
+176	2
+177	1
+178	1
+179	2
+18	2
+180	1
+181	1
+183	1
+186	1
+187	3
+189	1
+19	1
+190	1
+191	2
+192	1
+193	3
+194	1
+195	2
+196	1
+197	2
+199	3
+2	1
+20	1
+200	2
+201	1
+202	1
+203	2
+205	2
+207	2
+208	3
+209	2
+213	2
+214	1
+216	2
+217	2
+218	1
+219	2
+221	2
+222	1
+223	2
+224	2
+226	1
+228	1
+229	2
+230	5
+233	2
+235	1
+237	2
+238	2
+239	2
+24	2
+241	1
+242	2
+244	1
+247	1
+248	1
+249	1
+252	1
+255	2
+256	2
+257	1
+258	1
+26	2
+260	1
+262	1
+263	1
+265	2
+266	1
+27	1
+272	2
+273	3
+274	1
+275	1
+277	4
+278	2
+28	1
+280	2
+281	2
+282	2
+283	1
+284	1
+285	1
+286	1
+287	1
+288	2
+289	1
+291	1
+292	1
+296	1
+298	3
+30	1
+302	1
+305	1
+306	1
+307	2
+308	1
+309	2
+310	1
+311	3
+315	1
+316	3
+317	2
+318	3
+321	2
+322	2
+323	1
+325	2
+327	3
+33	1
+331	2
+332	1
+333	2
+335	1
+336	1
+338	1
+339	1
+34	1
+341	1
+342	2
+344	2
+345	1
+348	5
+35	3
+351	1
+353	2
+356	1
+360	1
+362	1
+364	1
+365	1
+366	1
+367	2
+368	1
+369	3
+37	2
+373	1
+374	1
+375	1
+377	1
+378	1
+379	1
+382	2
+384	3
+386	1
+389	1
+392	1
+393	1
+394	1
+395	2
+396	3
+397	2
+399	2
+4	1
+400	1
+401	5
+402	1
+403	3
+404	2
+406	4
+407	1
+409	3
+41	1
+411	1
+413	2
+414	2
+417	3
+418	1
+419	1
+42	2
+421	1
+424	2
+427	1
+429	2
+43	1
+430	3
+431	3
+432	1
+435	1
+436	1
+437	1
+438	3
+439	2
+44	1
+443	1
+444	1
+446	1
+448	1
+449	1
+452	1
+453	1
+454	3
+455	1
+457	1
+458	2
+459	2
+460	1
+462	2
+463	2
+466	3
+467	1
+468	4
+469	5
+47	1
+470	1
+472	1
+475	1
+477	1
+478	2
+479	1
+480	3
+481	1
+482	1
+483	1
+484	1
+485	1
+487	1
+489	4
+490	1
+491	1
+492	2
+493	1
+494	1
+495	1
+496	1
+497	1
+498	3
+5	3
+51	2
+53	1
+54	1
+57	1
+58	2
+64	1
+65	1
+66	1
+67	2
+69	1
+70	3
+72	2
+74	1
+76	2
+77	1
+78	1
+8	1
+80	1
+82	1
+83	2
+84	2
+85	1
+86	1
+87	1
+9	1
+90	3
+92	1
+95	2
+96	1
+97	2
+98	2
+PREHOOK: query: select * from src_multi2
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src_multi2
+#### A masked pattern was here ####
+POSTHOOK: query: select * from src_multi2
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src_multi2
+#### A masked pattern was here ####
+val_0	3
+val_10	1
+val_100	2
+val_103	2
+val_104	2
+val_105	1
+val_11	1
+val_111	1
+val_113	2
+val_114	1
+val_116	1
+val_118	2
+val_119	3
+val_12	2
+val_120	2
+val_125	2
+val_126	1
+val_128	3
+val_129	2
+val_131	1
+val_133	1
+val_134	2
+val_136	1
+val_137	2
+val_138	4
+val_143	1
+val_145	1
+val_146	2
+val_149	2
+val_15	2
+val_150	1
+val_152	2
+val_153	1
+val_155	1
+val_156	1
+val_157	1
+val_158	1
+val_160	1
+val_162	1
+val_163	1
+val_164	2
+val_165	2
+val_166	1
+val_167	3
+val_168	1
+val_169	4
+val_17	1
+val_170	1
+val_172	2
+val_174	2
+val_175	2
+val_176	2
+val_177	1
+val_178	1
+val_179	2
+val_18	2
+val_180	1
+val_181	1
+val_183	1
+val_186	1
+val_187	3
+val_189	1
+val_19	1
+val_190	1
+val_191	2
+val_192	1
+val_193	3
+val_194	1
+val_195	2
+val_196	1
+val_197	2
+val_199	3
+val_2	1
+val_20	1
+val_200	2
+val_201	1
+val_202	1
+val_203	2
+val_205	2
+val_207	2
+val_208	3
+val_209	2
+val_213	2
+val_214	1
+val_216	2
+val_217	2
+val_218	1
+val_219	2
+val_221	2
+val_222	1
+val_223	2
+val_224	2
+val_226	1
+val_228	1
+val_229	2
+val_230	5
+val_233	2
+val_235	1
+val_237	2
+val_238	2
+val_239	2
+val_24	2
+val_241	1
+val_242	2
+val_244	1
+val_247	1
+val_248	1
+val_249	1
+val_252	1
+val_255	2
+val_256	2
+val_257	1
+val_258	1
+val_26	2
+val_260	1
+val_262	1
+val_263	1
+val_265	2
+val_266	1
+val_27	1
+val_272	2
+val_273	3
+val_274	1
+val_275	1
+val_277	4
+val_278	2
+val_28	1
+val_280	2
+val_281	2
+val_282	2
+val_283	1
+val_284	1
+val_285	1
+val_286	1
+val_287	1
+val_288	2
+val_289	1
+val_291	1
+val_292	1
+val_296	1
+val_298	3
+val_30	1
+val_302	1
+val_305	1
+val_306	1
+val_307	2
+val_308	1
+val_309	2
+val_310	1
+val_311	3
+val_315	1
+val_316	3
+val_317	2
+val_318	3
+val_321	2
+val_322	2
+val_323	1
+val_325	2
+val_327	3
+val_33	1
+val_331	2
+val_332	1
+val_333	2
+val_335	1
+val_336	1
+val_338	1
+val_339	1
+val_34	1
+val_341	1
+val_342	2
+val_344	2
+val_345	1
+val_348	5
+val_35	3
+val_351	1
+val_353	2
+val_356	1
+val_360	1
+val_362	1
+val_364	1
+val_365	1
+val_366	1
+val_367	2
+val_368	1
+val_369	3
+val_37	2
+val_373	1
+val_374	1
+val_375	1
+val_377	1
+val_378	1
+val_379	1
+val_382	2
+val_384	3
+val_386	1
+val_389	1
+val_392	1
+val_393	1
+val_394	1
+val_395	2
+val_396	3
+val_397	2
+val_399	2
+val_4	1
+val_400	1
+val_401	5
+val_402	1
+val_403	3
+val_404	2
+val_406	4
+val_407	1
+val_409	3
+val_41	1
+val_411	1
+val_413	2
+val_414	2
+val_417	3
+val_418	1
+val_419	1
+val_42	2
+val_421	1
+val_424	2
+val_427	1
+val_429	2
+val_43	1
+val_430	3
+val_431	3
+val_432	1
+val_435	1
+val_436	1
+val_437	1
+val_438	3
+val_439	2
+val_44	1
+val_443	1
+val_444	1
+val_446	1
+val_448	1
+val_449	1
+val_452	1
+val_453	1
+val_454	3
+val_455	1
+val_457	1
+val_458	2
+val_459	2
+val_460	1
+val_462	2
+val_463	2
+val_466	3
+val_467	1
+val_468	4
+val_469	5
+val_47	1
+val_470	1
+val_472	1
+val_475	1
+val_477	1
+val_478	2
+val_479	1
+val_480	3
+val_481	1
+val_482	1
+val_483	1
+val_484	1
+val_485	1
+val_487	1
+val_489	4
+val_490	1
+val_491	1
+val_492	2
+val_493	1
+val_494	1
+val_495	1
+val_496	1
+val_497	1
+val_498	3
+val_5	3
+val_51	2
+val_53	1
+val_54	1
+val_57	1
+val_58	2
+val_64	1
+val_65	1
+val_66	1
+val_67	2
+val_69	1
+val_70	3
+val_72	2
+val_74	1
+val_76	2
+val_77	1
+val_78	1
+val_8	1
+val_80	1
+val_82	1
+val_83	2
+val_84	2
+val_85	1
+val_86	1
+val_87	1
+val_9	1
+val_90	3
+val_92	1
+val_95	2
+val_96	1
+val_97	2
+val_98	2
+PREHOOK: query: select * from src_multi3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src_multi3
+#### A masked pattern was here ####
+POSTHOOK: query: select * from src_multi3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src_multi3
+#### A masked pattern was here ####
+0	val_0
+4	val_4
+8	val_8
+0	val_0
+0	val_0
+5	val_5
+5	val_5
+2	val_2
+5	val_5
+9	val_9

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map.q.out Tue Oct 21 02:45:04 2014
@@ -30,40 +30,25 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-4 depends on stages: Stage-2
-  Stage-3 depends on stages: Stage-4, Stage-5
+  Stage-3 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-3
-  Stage-6 depends on stages: Stage-0
+  Stage-4 depends on stages: Stage-0
   Stage-1 depends on stages: Stage-3
-  Stage-7 depends on stages: Stage-1
-  Stage-5 depends on stages: Stage-2
+  Stage-5 depends on stages: Stage-1
 
 STAGE PLANS:
   Stage: Stage-2
     Spark
-#### A masked pattern was here ####
-      Vertices:
-        Map 3 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: true
-                    table:
-                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
-
-  Stage: Stage-4
-    Spark
       Edges:
-        Reducer 4 <- Map 1 (GROUP, 31)
+        Reducer 2 <- Map 1 (GROUP, 31)
+        Reducer 3 <- Map 1 (GROUP, 31)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
             Map Operator Tree:
                 TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: key, value
@@ -80,7 +65,23 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col1 (type: double)
-        Reducer 4 
+                  Select Operator
+                    expressions: key (type: string), value (type: string)
+                    outputColumnNames: key, value
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Group By Operator
+                      aggregations: sum(substr(value, 5))
+                      keys: key (type: string)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: double)
+        Reducer 2 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -100,6 +101,26 @@ STAGE PLANS:
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest1
+        Reducer 3 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                keys: KEY._col0 (type: string)
+                mode: mergepartial
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: UDFToInteger(_col0) (type: int), _col1 (type: double)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: true
+                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                        name: default.dest2
 
   Stage: Stage-3
     Dependency Collection
@@ -114,7 +135,7 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest1
 
-  Stage: Stage-6
+  Stage: Stage-4
     Stats-Aggr Operator
 
   Stage: Stage-1
@@ -127,54 +148,8 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest2
 
-  Stage: Stage-7
-    Stats-Aggr Operator
-
   Stage: Stage-5
-    Spark
-      Edges:
-        Reducer 5 <- Map 2 (GROUP, 31)
-#### A masked pattern was here ####
-      Vertices:
-        Map 2 
-            Map Operator Tree:
-                TableScan
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Group By Operator
-                      aggregations: sum(substr(value, 5))
-                      keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
-                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: double)
-        Reducer 5 
-            Reduce Operator Tree:
-              Group By Operator
-                aggregations: sum(VALUE._col0)
-                keys: KEY._col0 (type: string)
-                mode: mergepartial
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                Select Operator
-                  expressions: UDFToInteger(_col0) (type: int), _col1 (type: double)
-                  outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: true
-                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest2
+    Stats-Aggr Operator
 
 PREHOOK: query: FROM SRC
 INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out Tue Oct 21 02:45:04 2014
@@ -30,34 +30,18 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-4 depends on stages: Stage-2
-  Stage-3 depends on stages: Stage-4, Stage-5
+  Stage-3 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-3
-  Stage-6 depends on stages: Stage-0
+  Stage-4 depends on stages: Stage-0
   Stage-1 depends on stages: Stage-3
-  Stage-7 depends on stages: Stage-1
-  Stage-5 depends on stages: Stage-2
+  Stage-5 depends on stages: Stage-1
 
 STAGE PLANS:
   Stage: Stage-2
     Spark
-#### A masked pattern was here ####
-      Vertices:
-        Map 3 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: true
-                    table:
-                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
-
-  Stage: Stage-4
-    Spark
       Edges:
+        Reducer 2 <- Map 1 (GROUP SORT, 31)
+        Reducer 3 <- Reducer 2 (GROUP, 31)
         Reducer 4 <- Map 1 (GROUP SORT, 31)
         Reducer 5 <- Reducer 4 (GROUP, 31)
 #### A masked pattern was here ####
@@ -65,6 +49,8 @@ STAGE PLANS:
         Map 1 
             Map Operator Tree:
                 TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: key, value
@@ -81,7 +67,23 @@ STAGE PLANS:
                         Map-reduce partition columns: rand() (type: double)
                         Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col1 (type: double)
-        Reducer 4 
+                  Select Operator
+                    expressions: key (type: string), value (type: string)
+                    outputColumnNames: key, value
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Group By Operator
+                      aggregations: sum(substr(value, 5))
+                      keys: key (type: string)
+                      mode: hash
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      Reduce Output Operator
+                        key expressions: _col0 (type: string)
+                        sort order: +
+                        Map-reduce partition columns: rand() (type: double)
+                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                        value expressions: _col1 (type: double)
+        Reducer 2 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -95,7 +97,7 @@ STAGE PLANS:
                   Map-reduce partition columns: _col0 (type: string)
                   Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                   value expressions: _col1 (type: double)
-        Reducer 5 
+        Reducer 3 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -115,63 +117,7 @@ STAGE PLANS:
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest1
-
-  Stage: Stage-3
-    Dependency Collection
-
-  Stage: Stage-0
-    Move Operator
-      tables:
-          replace: true
-          table:
-              input format: org.apache.hadoop.mapred.TextInputFormat
-              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-              name: default.dest1
-
-  Stage: Stage-6
-    Stats-Aggr Operator
-
-  Stage: Stage-1
-    Move Operator
-      tables:
-          replace: true
-          table:
-              input format: org.apache.hadoop.mapred.TextInputFormat
-              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-              name: default.dest2
-
-  Stage: Stage-7
-    Stats-Aggr Operator
-
-  Stage: Stage-5
-    Spark
-      Edges:
-        Reducer 6 <- Map 2 (GROUP SORT, 31)
-        Reducer 7 <- Reducer 6 (GROUP, 31)
-#### A masked pattern was here ####
-      Vertices:
-        Map 2 
-            Map Operator Tree:
-                TableScan
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Group By Operator
-                      aggregations: sum(substr(value, 5))
-                      keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: rand() (type: double)
-                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: double)
-        Reducer 6 
+        Reducer 4 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -185,7 +131,7 @@ STAGE PLANS:
                   Map-reduce partition columns: _col0 (type: string)
                   Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                   value expressions: _col1 (type: double)
-        Reducer 7 
+        Reducer 5 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -206,6 +152,35 @@ STAGE PLANS:
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest2
 
+  Stage: Stage-3
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.dest1
+
+  Stage: Stage-4
+    Stats-Aggr Operator
+
+  Stage: Stage-1
+    Move Operator
+      tables:
+          replace: true
+          table:
+              input format: org.apache.hadoop.mapred.TextInputFormat
+              output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+              serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+              name: default.dest2
+
+  Stage: Stage-5
+    Stats-Aggr Operator
+
 PREHOOK: query: FROM SRC
 INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key
 INSERT OVERWRITE TABLE DEST2 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out?rev=1633268&r1=1633267&r2=1633268&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out Tue Oct 21 02:45:04 2014
@@ -30,40 +30,25 @@ INSERT OVERWRITE TABLE DEST2 SELECT SRC.
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-4 depends on stages: Stage-2
-  Stage-3 depends on stages: Stage-4, Stage-5
+  Stage-3 depends on stages: Stage-2
   Stage-0 depends on stages: Stage-3
-  Stage-6 depends on stages: Stage-0
+  Stage-4 depends on stages: Stage-0
   Stage-1 depends on stages: Stage-3
-  Stage-7 depends on stages: Stage-1
-  Stage-5 depends on stages: Stage-2
+  Stage-5 depends on stages: Stage-1
 
 STAGE PLANS:
   Stage: Stage-2
     Spark
-#### A masked pattern was here ####
-      Vertices:
-        Map 3 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: true
-                    table:
-                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
-
-  Stage: Stage-4
-    Spark
       Edges:
-        Reducer 4 <- Map 1 (GROUP, 31)
+        Reducer 2 <- Map 1 (GROUP, 31)
+        Reducer 3 <- Map 1 (GROUP, 31)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
             Map Operator Tree:
                 TableScan
+                  alias: src
+                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                   Select Operator
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: key, value
@@ -74,7 +59,17 @@ STAGE PLANS:
                       Map-reduce partition columns: key (type: string)
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                       value expressions: substr(value, 5) (type: string)
-        Reducer 4 
+                  Select Operator
+                    expressions: key (type: string), value (type: string)
+                    outputColumnNames: key, value
+                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                    Reduce Output Operator
+                      key expressions: key (type: string)
+                      sort order: +
+                      Map-reduce partition columns: key (type: string)
+                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+                      value expressions: substr(value, 5) (type: string)
+        Reducer 2 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: sum(VALUE._col0)
@@ -94,6 +89,26 @@ STAGE PLANS:
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest1
+        Reducer 3 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: sum(VALUE._col0)
+                keys: KEY._col0 (type: string)
+                mode: complete
+                outputColumnNames: _col0, _col1
+                Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                Select Operator
+                  expressions: UDFToInteger(_col0) (type: int), _col1 (type: double)
+                  outputColumnNames: _col0, _col1
+                  Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                  File Output Operator
+                    compressed: true
+                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
+                    table:
+                        input format: org.apache.hadoop.mapred.TextInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                        name: default.dest2
 
   Stage: Stage-3
     Dependency Collection
@@ -108,7 +123,7 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest1
 
-  Stage: Stage-6
+  Stage: Stage-4
     Stats-Aggr Operator
 
   Stage: Stage-1
@@ -121,48 +136,8 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest2
 
-  Stage: Stage-7
-    Stats-Aggr Operator
-
   Stage: Stage-5
-    Spark
-      Edges:
-        Reducer 5 <- Map 2 (GROUP, 31)
-#### A masked pattern was here ####
-      Vertices:
-        Map 2 
-            Map Operator Tree:
-                TableScan
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: key (type: string)
-                      sort order: +
-                      Map-reduce partition columns: key (type: string)
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: substr(value, 5) (type: string)
-        Reducer 5 
-            Reduce Operator Tree:
-              Group By Operator
-                aggregations: sum(VALUE._col0)
-                keys: KEY._col0 (type: string)
-                mode: complete
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                Select Operator
-                  expressions: UDFToInteger(_col0) (type: int), _col1 (type: double)
-                  outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                  File Output Operator
-                    compressed: true
-                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                    table:
-                        input format: org.apache.hadoop.mapred.TextInputFormat
-                        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                        name: default.dest2
+    Stats-Aggr Operator
 
 PREHOOK: query: FROM SRC
 INSERT OVERWRITE TABLE DEST1 SELECT SRC.key, sum(SUBSTR(SRC.value,5)) GROUP BY SRC.key