You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/09/20 07:29:30 UTC

svn commit: r1626383 - in /hive/branches/spark/ql/src: java/org/apache/hadoop/hive/ql/parse/spark/ test/results/clientpositive/spark/

Author: xuefu
Date: Sat Sep 20 05:29:30 2014
New Revision: 1626383

URL: http://svn.apache.org/r1626383
Log:
HIVE-7503: Support Hive's multi-table insert query with Spark [Spark Branch] (Chao via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
    hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Sat Sep 20 05:29:30 2014
@@ -31,10 +31,11 @@ import org.apache.hadoop.hive.ql.exec.De
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -48,9 +49,9 @@ import org.apache.hadoop.hive.ql.plan.Sp
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
 /**
- * GenSparkProcContext maintains information about the tasks and operators 
+ * GenSparkProcContext maintains information about the tasks and operators
  * as we walk the operator tree to break them into SparkTasks.
- * 
+ *
  * Cloned from GenTezProcContext.
  *
  */
@@ -76,6 +77,10 @@ public class GenSparkProcContext impleme
   // walk.
   public Operator<? extends OperatorDesc> parentOfRoot;
 
+  // Default task is the task we use for those operators that are not connected
+  // to the newly generated TS
+  public SparkTask defaultTask;
+
   // Spark task we're currently processing
   public SparkTask currentTask;
 
@@ -83,6 +88,20 @@ public class GenSparkProcContext impleme
   // one.
   public BaseWork preceedingWork;
 
+  // All operators that we should unlink with their parents, for multi-table insertion
+  // It's a mapping from operator to its ONLY parent.
+  public Map<Operator<?>, Operator<?>> opToParentMap;
+
+  // A mapping from operators to their corresponding tasks.
+  // The key for this map could only be:
+  //  1. TableScanOperators (so we know which task for the tree rooted at this TS)
+  //  2. FileSinkOperators (need this info in GenSparkUtils::processFileSinks)
+  //  3. UnionOperator/JoinOperator (need for merging tasks)
+  public final Map<Operator<?>, SparkTask> opToTaskMap;
+
+  // temporary TS generated for multi-table insertion
+  public final Set<TableScanOperator> tempTS;
+
   // map that keeps track of the last operator of a task to the work
   // that follows it. This is used for connecting them later.
   public final Map<Operator<?>, BaseWork> leafOperatorToFollowingWork;
@@ -138,8 +157,10 @@ public class GenSparkProcContext impleme
     this.rootTasks = rootTasks;
     this.inputs = inputs;
     this.outputs = outputs;
-    this.currentTask = (SparkTask) TaskFactory.get(
-         new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
+    this.defaultTask = (SparkTask) TaskFactory.get(
+        new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
+    this.rootTasks.add(defaultTask);
+    this.currentTask = null;
     this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>();
     this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, SparkEdgeProperty>>();
     this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>();
@@ -157,8 +178,8 @@ public class GenSparkProcContext impleme
     this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
     this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
     this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
-
-    rootTasks.add(currentTask);
+    this.opToParentMap = new LinkedHashMap<Operator<?>, Operator<?>>();
+    this.opToTaskMap = new LinkedHashMap<Operator<?>, SparkTask>();
+    this.tempTS = new LinkedHashSet<TableScanOperator>();
   }
-
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Sat Sep 20 05:29:30 2014
@@ -34,14 +34,30 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.*;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
 
 /**
  * GenSparkUtils is a collection of shared helper methods to produce SparkWork
@@ -157,6 +173,16 @@ public class GenSparkUtils {
     return mapWork;
   }
 
+  // Create a MapWork for a temporary TableScanOperator
+  // Basically a thin wrapper on GenMapRedUtils.setTaskPlan.
+  public MapWork createMapWork(TableScanOperator root,
+                               SparkWork sparkWork, String path, TableDesc tt_desc) throws SemanticException {
+    MapWork mapWork = new MapWork("Map " + (++sequenceNumber));
+    GenMapRedUtils.setTaskPlan(path, path, root, mapWork, false, tt_desc);
+    sparkWork.add(mapWork);
+    return mapWork;
+  }
+
   // this method's main use is to help unit testing this class
   protected void setupMapWork(MapWork mapWork, GenSparkProcContext context,
       PrunedPartitionList partitions, Operator<? extends OperatorDesc> root,
@@ -251,15 +277,19 @@ public class GenSparkUtils {
       throws SemanticException {
 
     ParseContext parseContext = context.parseContext;
+    Preconditions.checkArgument(context.opToTaskMap.containsKey(fileSink),
+        "AssertionError: the fileSink " + fileSink.getName() + " should be in the context");
+
+    SparkTask currentTask = context.opToTaskMap.get(fileSink);
 
     boolean isInsertTable = // is INSERT OVERWRITE TABLE
         GenMapRedUtils.isInsertInto(parseContext, fileSink);
     HiveConf hconf = parseContext.getConf();
 
     boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
-        hconf, fileSink, context.currentTask, isInsertTable);
+        hconf, fileSink, currentTask, isInsertTable);
 
-    Path finalName = GenMapRedUtils.createMoveTask(context.currentTask,
+    Path finalName = GenMapRedUtils.createMoveTask(currentTask,
         chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
 
     if (chDir) {
@@ -268,13 +298,13 @@ public class GenSparkUtils {
       logger.info("using CombineHiveInputformat for the merge job");
       GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
           context.dependencyTask, context.moveTask,
-          hconf, context.currentTask);
+          hconf, currentTask);
     }
 
     FetchTask fetchTask = parseContext.getFetchTask();
-    if (fetchTask != null && context.currentTask.getNumChild() == 0) {
+    if (fetchTask != null && currentTask.getNumChild() == 0) {
       if (fetchTask.isFetchFrom(fileSink.getConf())) {
-        context.currentTask.setFetchSource(true);
+        currentTask.setFetchSource(true);
       }
     }
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Sat Sep 20 05:29:30 2014
@@ -50,7 +50,7 @@ import com.google.common.base.Preconditi
  * and break the operators into work and tasks along the way.
  *
  * Cloned from GenTezWork.
- * 
+ *
  * TODO: need to go thru this to make it fit completely to Spark.
  */
 public class GenSparkWork implements NodeProcessor {
@@ -95,6 +95,10 @@ public class GenSparkWork implements Nod
       return null;
     }
 
+    if (operator instanceof FileSinkOperator) {
+      context.opToTaskMap.put(operator, context.currentTask);
+    }
+
     SparkWork sparkWork = context.currentTask.getWork();
 
     // Right now the work graph is pretty simple. If there is no

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Sat Sep 20 05:29:30 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -72,9 +73,9 @@ import org.apache.hadoop.hive.ql.session
 
 /**
  * SparkCompiler translates the operator plan into SparkTasks.
- * 
+ *
  * Pretty much cloned from TezCompiler.
- * 
+ *
  * TODO: need to complete and make it fit to Spark.
  */
 public class SparkCompiler extends TaskCompiler {
@@ -86,7 +87,7 @@ public class SparkCompiler extends TaskC
   @Override
   public void init(HiveConf conf, LogHelper console, Hive db) {
     super.init(conf, console, db);
-    
+
 //    TODO: Need to check if we require the use of recursive input dirs for union processing
 //    conf.setBoolean("mapred.input.dir.recursive", true);
 //    HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
@@ -103,7 +104,6 @@ public class SparkCompiler extends TaskC
     // Create the context for the walker
     OptimizeSparkProcContext procCtx
       = new OptimizeSparkProcContext(conf, pCtx, inputs, outputs, deque);
-
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack.
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
@@ -139,9 +139,39 @@ public class SparkCompiler extends TaskC
     GenSparkProcContext procCtx = new GenSparkProcContext(
         conf, tempParseContext, mvTask, rootTasks, inputs, outputs);
 
+    // -------------------- First Pass ---------------------
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"),
+        new SparkTableScanProcessor());
+
+    Dispatcher disp = new DefaultRuleDispatcher(new SparkMultiInsertionProcessor(), opRules, procCtx);
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
+    ogw.startWalking(topNodes, null);
+
+    // ------------------- Second Pass ----------------------
+
+    // Merge tasks upon Join/Union if possible
+    opRules.clear();
+    opRules.put(new RuleRegExp("Join", JoinOperator.getOperatorName() + "%"),
+        new SparkMergeTaskProcessor());
+    opRules.put(new RuleRegExp("Union", UnionOperator.getOperatorName() + "%"),
+        new SparkMergeTaskProcessor());
+    disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    topNodes = new ArrayList<Node>();
+    topNodes.addAll(procCtx.tempTS); // First process temp TS
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw = new GenSparkWorkWalker(disp, procCtx);
+    ogw.startWalking(topNodes, null);
+
+
+    // ------------------- Third Pass -----------------------
+
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack. The dispatcher generates the plan from the operator tree
-    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.clear();
     opRules.put(new RuleRegExp("Split Work - ReduceSink",
         ReduceSinkOperator.getOperatorName() + "%"), genSparkWork);
 
@@ -154,28 +184,40 @@ public class SparkCompiler extends TaskC
 
     opRules.put(new RuleRegExp("Handle Analyze Command",
         TableScanOperator.getOperatorName() + "%"),
-        new SparkProcessAnalyzeTable(GenSparkUtils.getUtils()));
-
-      opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"),
-          new NodeProcessor() {
-        @Override
-        public Object process(Node n, Stack<Node> s,
-          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
-          GenSparkProcContext context = (GenSparkProcContext) procCtx;
-          UnionOperator union = (UnionOperator) n;
-
-          // simply need to remember that we've seen a union.
-          context.currentUnionOperators.add(union);
-          return null;
+        new CompositeProcessor(
+            new NodeProcessor() {
+              @Override
+              public Object process(Node nd, Stack<Node> s,
+                                    NodeProcessorCtx procCtx, Object... no) throws SemanticException {
+                GenSparkProcContext context = (GenSparkProcContext) procCtx;
+                context.currentTask = context.opToTaskMap.get(nd);
+                return null;
+              }
+            },
+            new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())));
+
+    opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"),
+        new NodeProcessor() {
+          @Override
+          public Object process(Node n, Stack<Node> s,
+                                NodeProcessorCtx procCtx, Object... os) throws SemanticException {
+            GenSparkProcContext context = (GenSparkProcContext) procCtx;
+            UnionOperator union = (UnionOperator) n;
+
+            // simply need to remember that we've seen a union.
+            context.currentUnionOperators.add(union);
+            return null;
+          }
         }
-      });
+    );
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
-    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
-    List<Node> topNodes = new ArrayList<Node>();
+    disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    topNodes = new ArrayList<Node>();
     topNodes.addAll(pCtx.getTopOps().values());
-    GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
+    topNodes.addAll(procCtx.tempTS);
+    ogw = new GenSparkWorkWalker(disp, procCtx);
     ogw.startWalking(topNodes, null);
 
     // we need to clone some operator plans and remove union operators still

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java?rev=1626383&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java Sat Sep 20 05:29:30 2014
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.util.Map;
+import java.util.Stack;
+
+public class SparkMergeTaskProcessor implements NodeProcessor {
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+    GenSparkProcContext context = (GenSparkProcContext) procCtx;
+    Operator<?> op = (Operator<?>) nd;
+    Map<Operator<?>, SparkTask> opTable = context.opToTaskMap;
+    SparkTask currentTask = opTable.get(context.currentRootOperator);
+    if (!opTable.containsKey(op)) {
+      opTable.put(op, currentTask);
+    } else {
+      // If this op has already been visited, since we visit temporary TS first,
+      // also with the assumption that two paths from two different tembporary TS will NOT
+      // meet, the current task must be the default task.
+      // TODO: better we can prove that they'll never meet.
+      SparkTask existingTask = opTable.get(op);
+      if (currentTask == context.defaultTask && existingTask != context.defaultTask) {
+        opTable.put(context.currentRootOperator, existingTask);
+      }
+    }
+
+    return null;
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java?rev=1626383&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java Sat Sep 20 05:29:30 2014
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Stack;
+
+
+public class SparkMultiInsertionProcessor implements NodeProcessor {
+
+  private Set<Operator<?>> processed = new HashSet<Operator<?>>();
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                        Object... nodeOutputs) throws SemanticException {
+    Operator<?> op = (Operator<?>) nd;
+    GenSparkProcContext context = (GenSparkProcContext) procCtx;
+
+    if (context.opToParentMap.containsKey(op)) {
+      splitPlan(op, context);
+      context.opToParentMap.remove(op);
+    }
+
+    return null;
+  }
+
+  /**
+   * Split two tasks by creating a temporary file between them.
+   *
+   * @param op the select operator encountered
+   * @param context processing context
+   */
+  @SuppressWarnings("nls")
+  private void splitPlan(Operator<?> op, GenSparkProcContext context)
+      throws SemanticException {
+    Preconditions.checkArgument(op.getNumParent() == 1,
+        "AssertionError: expecting operator " + op + " to have only one parent," +
+            " but found multiple parents : " + op.getParentOperators());
+    // nested multi-insertion shouldn't happen.
+    SparkTask parentTask = context.defaultTask;
+    SparkTask childTask = (SparkTask) TaskFactory.get(
+        new SparkWork(context.conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), context.conf);
+
+    GenSparkUtils utils = GenSparkUtils.getUtils();
+    ParseContext parseCtx = context.parseContext;
+    parentTask.addDependentTask(childTask);
+
+    // Generate the temporary file name
+    Operator<?> parent = context.opToParentMap.get(op);
+
+    Path taskTmpDir;
+    TableDesc tt_desc;
+
+    if (processed.add(parent)) {
+      taskTmpDir = parseCtx.getContext().getMRTmpPath();
+      tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+          .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
+      createTempFS(parent, taskTmpDir, tt_desc, parseCtx);
+    } else {
+      FileSinkOperator fs = (FileSinkOperator) parent.getChildOperators().get(0);
+      tt_desc = fs.getConf().getTableInfo();
+      taskTmpDir = fs.getConf().getDirName();
+    }
+
+    TableScanOperator tableScan = createTempTS(parent, op, parseCtx);
+    String streamDesc = taskTmpDir.toUri().toString();
+    context.opToTaskMap.put(tableScan, childTask);
+    context.tempTS.add(tableScan);
+    MapWork mapWork = utils.createMapWork(tableScan, childTask.getWork(), streamDesc, tt_desc);
+    context.rootToWorkMap.put(tableScan, mapWork);
+  }
+
+  private void createTempFS(Operator<? extends OperatorDesc> parent,
+      Path taskTmpDir, TableDesc tt_desc, ParseContext parseCtx) {
+    boolean compressIntermediate =
+        parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE);
+    FileSinkDesc desc = new FileSinkDesc(taskTmpDir, tt_desc, compressIntermediate);
+    if (compressIntermediate) {
+      desc.setCompressCodec(parseCtx.getConf().getVar(
+          HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
+      desc.setCompressType(parseCtx.getConf().getVar(
+          HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
+    }
+    Operator<? extends OperatorDesc> fileSinkOp = GenMapRedUtils.putOpInsertMap(OperatorFactory
+        .get(desc, parent.getSchema()), null, parseCtx);
+
+    // Connect parent to fileSinkOp
+    parent.setChildOperators(Utilities.makeList(fileSinkOp));
+    fileSinkOp.setParentOperators(Utilities.makeList(parent));
+  }
+
+  private TableScanOperator createTempTS(Operator<? extends OperatorDesc> parent,
+                                         Operator<? extends OperatorDesc> child,
+                                         ParseContext parseCtx) {
+    // Create a dummy TableScanOperator for the file generated through fileSinkOp
+    RowResolver parentRowResolver =
+        parseCtx.getOpParseCtx().get(parent).getRowResolver();
+    TableScanOperator tableScanOp = (TableScanOperator) GenMapRedUtils.putOpInsertMap(
+        GenMapRedUtils.createTemporaryTableScanOperator(parent.getSchema()),
+        parentRowResolver, parseCtx);
+
+    tableScanOp.setChildOperators(Utilities.makeList(child));
+    child.setParentOperators(Utilities.makeList(tableScanOp));
+
+    return tableScanOp;
+  }
+
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java Sat Sep 20 05:29:30 2014
@@ -77,8 +77,12 @@ public class SparkProcessAnalyzeTable im
   public Object process(Node nd, Stack<Node> stack,
       NodeProcessorCtx procContext, Object... nodeOutputs) throws SemanticException {
     GenSparkProcContext context = (GenSparkProcContext) procContext;
-    
+
     TableScanOperator tableScan = (TableScanOperator) nd;
+    // If this tableScan is a generated one for multi-insertion, ignore it
+    if (context.tempTS.contains(tableScan)) {
+      return null;
+    }
 
     ParseContext parseContext = context.parseContext;
     Class<? extends InputFormat> inputFormat = parseContext.getTopToTable().get(tableScan)
@@ -167,14 +171,14 @@ public class SparkProcessAnalyzeTable im
    *
    * It is composed of PartialScanTask followed by StatsTask.
    */
-  private void handlePartialScanCommand(TableScanOperator tableScan, ParseContext parseContext, 
+  private void handlePartialScanCommand(TableScanOperator tableScan, ParseContext parseContext,
       QBParseInfo parseInfo, StatsWork statsWork, GenSparkProcContext context,
       Task<StatsWork> statsTask) throws SemanticException {
     String aggregationKey = tableScan.getConf().getStatsAggPrefix();
     StringBuffer aggregationKeyBuffer = new StringBuffer(aggregationKey);
     List<Path> inputPaths = GenMapRedUtils.getInputPathsForPartialScan(parseInfo, aggregationKeyBuffer);
     aggregationKey = aggregationKeyBuffer.toString();
-    
+
     // scan work
     PartialScanWork scanWork = new PartialScanWork(inputPaths);
     scanWork.setMapperCannotSpanPartns(true);

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java?rev=1626383&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java Sat Sep 20 05:29:30 2014
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import com.clearspring.analytics.util.Preconditions;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Stack;
+
+public class SparkTableScanProcessor implements NodeProcessor {
+
+  @Override
+  public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+                        Object... nodeOutputs) throws SemanticException {
+    GenSparkProcContext context = (GenSparkProcContext) procCtx;
+    TableScanOperator tblScan = (TableScanOperator) nd;
+
+    context.opToTaskMap.put(tblScan, context.defaultTask);
+
+    // For multi-table insertion, we first look for potential multiple FSs that can be reached
+    // from this TS. In the process of searching, we also record the path to each of these FS.
+    // Then, we find the LCA for these FSs.
+    //
+    // That is, in scenarios like the following:
+    //
+    //                   OP 1 (TS, UNION, etc)
+    //                 /    \
+    //               OP 2   OP 3
+    //
+    // If we find such an operator, we record all of its children to context, and unlink
+    //   them with this operator later, in SparkMultiInsertionProcessor, and it will be become:
+    //
+    //                  OP 1 (TS, UNION, FOR, etc)
+    //                  |
+    //                  FS
+    //
+    //              TS      TS
+    //              |        |
+    //             OP 2     OP 3
+    //
+    // where the two branches starting with TS are in different Spark tasks.
+    //
+    // Because of the restrictions on multi-insertion queries, there could only be two
+    // categories of TS here: one through which we can reach multiple FSs, and one through
+    // which we can only reach one FS. For all TS in the first category, they should only
+    // be able to reach the same set of FS.
+    // A further conclusion is, there should only be one LCA for the entire operator tree.
+    //
+    // N.B.: one special case is when OP is ForwardOperator, in which case we shouldn't break
+    // the tree since it's already optimized.
+    Map<FileSinkOperator, Stack<Operator<? extends OperatorDesc>>> fsToPath
+        = new HashMap<FileSinkOperator, Stack<Operator<? extends OperatorDesc>>>();
+    Queue<Stack<Operator<? extends OperatorDesc>>> paths =
+        new LinkedList<Stack<Operator<? extends OperatorDesc>>>();
+    Stack<Operator<? extends OperatorDesc>> p = new Stack<Operator<? extends OperatorDesc>>();
+    p.push(tblScan);
+    paths.offer(p);
+
+    while (!paths.isEmpty()) {
+      Stack<Operator<? extends OperatorDesc>> currPath = paths.poll();
+      Operator<? extends OperatorDesc> currOp = currPath.peek();
+      if (currOp instanceof FileSinkOperator) {
+        FileSinkOperator fsOp = (FileSinkOperator) currOp;
+        // In case there are multiple paths lead to this FS, we keep the shortest one.
+        // (We could also keep the longest one - it doesn't matter)
+        if (!fsToPath.containsKey(fsOp) || currPath.size() < fsToPath.get(fsOp).size()) {
+          fsToPath.put(fsOp, currPath);
+        }
+      }
+
+      for (Operator<? extends OperatorDesc> nextOp : currOp.getChildOperators()) {
+        Stack<Operator<? extends OperatorDesc>> nextPath = new Stack<Operator<? extends OperatorDesc>>();
+        nextPath.addAll(currPath);
+        nextPath.push(nextOp);
+        paths.offer(nextPath);
+      }
+    }
+
+    if (fsToPath.size() > 1) {
+      // Now, compute the LOWEST height for all these FSs
+      int lowest = -1;
+      for (Map.Entry<FileSinkOperator, Stack<Operator<? extends OperatorDesc>>> e : fsToPath.entrySet()) {
+        if (lowest < 0 || e.getValue().size() < lowest) {
+          lowest = e.getValue().size();
+        }
+      }
+
+      // Now, we move up those path that has length larger than the lowest
+      for (Stack<Operator<? extends OperatorDesc>> st : fsToPath.values()) {
+        while (st.size() > lowest) {
+          st.pop();
+        }
+      }
+
+      // Now, we move all paths up together, until we reach a least common ancestor
+      Operator<? extends OperatorDesc> lca;
+      while (true) {
+        lca = null;
+        boolean same = true;
+        for (Stack<Operator<? extends OperatorDesc>> st : fsToPath.values()) {
+          Operator<? extends OperatorDesc> op = st.pop();
+          if (lca == null) {
+            lca = op;
+          } else if (lca != op) {
+            same = false; // but we still need to pop the rest..
+          }
+        }
+        if (same) {
+          break;
+        }
+      }
+
+      Preconditions.checkArgument(lca.getNumChild() > 1,
+          "AssertionError: the LCA should have multiple children, but got " + lca.getNumChild());
+
+      // Special case: don't break if LCA is FOR.
+      if (!(lca instanceof ForwardOperator)) {
+        for (Operator<? extends OperatorDesc> childOp : lca.getChildOperators()) {
+          context.opToParentMap.put(childOp, lca);
+        }
+      }
+    }
+
+    return null;
+  }
+}

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out Sat Sep 20 05:29:30 2014
@@ -265,22 +265,38 @@ insert overwrite table x.insert1 select 
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-3 depends on stages: Stage-2
+  Stage-4 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-4, Stage-5
   Stage-0 depends on stages: Stage-3
-  Stage-4 depends on stages: Stage-0
+  Stage-6 depends on stages: Stage-0
   Stage-1 depends on stages: Stage-3
-  Stage-5 depends on stages: Stage-1
+  Stage-7 depends on stages: Stage-1
+  Stage-5 depends on stages: Stage-2
 
 STAGE PLANS:
   Stage: Stage-2
     Spark
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 3 
             Map Operator Tree:
                 TableScan
                   alias: insert2
                   Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+  Stage: Stage-4
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
                   Filter Operator
                     predicate: (key < 10) (type: boolean)
                     Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
@@ -296,21 +312,6 @@ STAGE PLANS:
                             output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                             serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                             name: default.insert1
-                  Filter Operator
-                    predicate: ((key > 10) and (key < 20)) (type: boolean)
-                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                    Select Operator
-                      expressions: key (type: int), value (type: string)
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                      File Output Operator
-                        compressed: false
-                        Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                        table:
-                            input format: org.apache.hadoop.mapred.TextInputFormat
-                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                            name: x.insert1
 
   Stage: Stage-3
     Dependency Collection
@@ -325,7 +326,7 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.insert1
 
-  Stage: Stage-4
+  Stage: Stage-6
     Stats-Aggr Operator
 
   Stage: Stage-1
@@ -338,9 +339,32 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: x.insert1
 
-  Stage: Stage-5
+  Stage: Stage-7
     Stats-Aggr Operator
 
+  Stage: Stage-5
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  Filter Operator
+                    predicate: ((key > 10) and (key < 20)) (type: boolean)
+                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+                    Select Operator
+                      expressions: key (type: int), value (type: string)
+                      outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+                      File Output Operator
+                        compressed: false
+                        Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+                        table:
+                            input format: org.apache.hadoop.mapred.TextInputFormat
+                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                            name: x.insert1
+
 PREHOOK: query: -- HIVE-3676
 CREATE DATABASE db2
 PREHOOK: type: CREATEDATABASE

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out Sat Sep 20 05:29:30 2014
@@ -34,21 +34,23 @@ INSERT OVERWRITE TABLE DEST2 SELECT unio
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-3 depends on stages: Stage-2
+  Stage-4 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-4, Stage-5
   Stage-0 depends on stages: Stage-3
-  Stage-4 depends on stages: Stage-0
+  Stage-6 depends on stages: Stage-0
   Stage-1 depends on stages: Stage-3
-  Stage-5 depends on stages: Stage-1
+  Stage-7 depends on stages: Stage-1
+  Stage-5 depends on stages: Stage-2
 
 STAGE PLANS:
   Stage: Stage-2
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP)
-        Union 3 <- Map 4 (NONE), Reducer 2 (NONE)
+        Reducer 4 <- Map 3 (GROUP)
+        Union 5 <- Map 6 (NONE), Reducer 4 (NONE)
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 3 
             Map Operator Tree:
                 TableScan
                   alias: s1
@@ -64,34 +66,20 @@ STAGE PLANS:
                         sort order: 
                         Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col0 (type: bigint)
-        Map 4 
+        Map 6 
             Map Operator Tree:
                 TableScan
                   alias: s2
                   Select Operator
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: _col0, _col1
-                    Select Operator
-                      expressions: _col0 (type: string), _col1 (type: string)
-                      outputColumnNames: _col0, _col1
-                      File Output Operator
-                        compressed: false
-                        table:
-                            input format: org.apache.hadoop.mapred.TextInputFormat
-                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                            name: default.dest1
-                    Select Operator
-                      expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
-                      outputColumnNames: _col0, _col1, _col2
-                      File Output Operator
-                        compressed: false
-                        table:
-                            input format: org.apache.hadoop.mapred.TextInputFormat
-                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                            name: default.dest2
-        Reducer 2 
+                    File Output Operator
+                      compressed: false
+                      table:
+                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+        Reducer 4 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
@@ -100,28 +88,34 @@ STAGE PLANS:
                 Select Operator
                   expressions: 'tst1' (type: string), UDFToString(_col0) (type: string)
                   outputColumnNames: _col0, _col1
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+        Union 5 
+            Vertex: Union 5
+
+  Stage: Stage-4
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
                   Select Operator
                     expressions: _col0 (type: string), _col1 (type: string)
                     outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 501 Data size: 136272 Basic stats: COMPLETE Column stats: PARTIAL
                     File Output Operator
                       compressed: false
+                      Statistics: Num rows: 501 Data size: 136272 Basic stats: COMPLETE Column stats: PARTIAL
                       table:
                           input format: org.apache.hadoop.mapred.TextInputFormat
                           output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                           serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                           name: default.dest1
-                  Select Operator
-                    expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
-                    outputColumnNames: _col0, _col1, _col2
-                    File Output Operator
-                      compressed: false
-                      table:
-                          input format: org.apache.hadoop.mapred.TextInputFormat
-                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                          name: default.dest2
-        Union 3 
-            Vertex: Union 3
 
   Stage: Stage-3
     Dependency Collection
@@ -136,7 +130,7 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest1
 
-  Stage: Stage-4
+  Stage: Stage-6
     Stats-Aggr Operator
 
   Stage: Stage-1
@@ -149,9 +143,29 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest2
 
-  Stage: Stage-5
+  Stage: Stage-7
     Stats-Aggr Operator
 
+  Stage: Stage-5
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  Select Operator
+                    expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 501 Data size: 228456 Basic stats: COMPLETE Column stats: PARTIAL
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 501 Data size: 228456 Basic stats: COMPLETE Column stats: PARTIAL
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                          name: default.dest2
+
 PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
                          UNION  ALL  
       select s2.key as key, s2.value as value from src s2) unionsrc

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out Sat Sep 20 05:29:30 2014
@@ -34,22 +34,23 @@ INSERT OVERWRITE TABLE DEST2 SELECT unio
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-3 depends on stages: Stage-2
+  Stage-4 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-4, Stage-5
   Stage-0 depends on stages: Stage-3
-  Stage-4 depends on stages: Stage-0
+  Stage-6 depends on stages: Stage-0
   Stage-1 depends on stages: Stage-3
-  Stage-5 depends on stages: Stage-1
+  Stage-7 depends on stages: Stage-1
+  Stage-5 depends on stages: Stage-2
 
 STAGE PLANS:
   Stage: Stage-2
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP)
-        Reducer 4 <- Union 3 (GROUP)
-        Union 3 <- Map 5 (NONE), Reducer 2 (NONE)
+        Reducer 4 <- Map 3 (GROUP)
+        Union 5 <- Map 6 (NONE), Reducer 4 (NONE)
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 3 
             Map Operator Tree:
                 TableScan
                   alias: s1
@@ -65,37 +66,20 @@ STAGE PLANS:
                         sort order: 
                         Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
                         value expressions: _col0 (type: bigint)
-        Map 5 
+        Map 6 
             Map Operator Tree:
                 TableScan
                   alias: s2
                   Select Operator
                     expressions: key (type: string), value (type: string)
                     outputColumnNames: _col0, _col1
-                    Select Operator
-                      expressions: _col0 (type: string), _col1 (type: string)
-                      outputColumnNames: _col0, _col1
-                      Group By Operator
-                        aggregations: count(_col1)
-                        keys: _col0 (type: string)
-                        mode: hash
-                        outputColumnNames: _col0, _col1
-                        Reduce Output Operator
-                          key expressions: _col0 (type: string)
-                          sort order: +
-                          Map-reduce partition columns: _col0 (type: string)
-                          value expressions: _col1 (type: bigint)
-                    Select Operator
-                      expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
-                      outputColumnNames: _col0, _col1, _col2
-                      File Output Operator
-                        compressed: false
-                        table:
-                            input format: org.apache.hadoop.mapred.TextInputFormat
-                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                            name: default.dest2
-        Reducer 2 
+                    File Output Operator
+                      compressed: false
+                      table:
+                          input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+        Reducer 4 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
@@ -104,30 +88,41 @@ STAGE PLANS:
                 Select Operator
                   expressions: 'tst1' (type: string), UDFToString(_col0) (type: string)
                   outputColumnNames: _col0, _col1
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+        Union 5 
+            Vertex: Union 5
+
+  Stage: Stage-4
+    Spark
+      Edges:
+        Reducer 7 <- Map 1 (GROUP)
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
                   Select Operator
                     expressions: _col0 (type: string), _col1 (type: string)
                     outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 501 Data size: 5584 Basic stats: COMPLETE Column stats: PARTIAL
                     Group By Operator
                       aggregations: count(_col1)
                       keys: _col0 (type: string)
                       mode: hash
                       outputColumnNames: _col0, _col1
+                      Statistics: Num rows: 501 Data size: 48096 Basic stats: COMPLETE Column stats: PARTIAL
                       Reduce Output Operator
                         key expressions: _col0 (type: string)
                         sort order: +
                         Map-reduce partition columns: _col0 (type: string)
+                        Statistics: Num rows: 501 Data size: 48096 Basic stats: COMPLETE Column stats: PARTIAL
                         value expressions: _col1 (type: bigint)
-                  Select Operator
-                    expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
-                    outputColumnNames: _col0, _col1, _col2
-                    File Output Operator
-                      compressed: false
-                      table:
-                          input format: org.apache.hadoop.mapred.TextInputFormat
-                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                          name: default.dest2
-        Reducer 4 
+        Reducer 7 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
@@ -147,8 +142,6 @@ STAGE PLANS:
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest1
-        Union 3 
-            Vertex: Union 3
 
   Stage: Stage-3
     Dependency Collection
@@ -163,7 +156,7 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest1
 
-  Stage: Stage-4
+  Stage: Stage-6
     Stats-Aggr Operator
 
   Stage: Stage-1
@@ -176,9 +169,29 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.dest2
 
-  Stage: Stage-5
+  Stage: Stage-7
     Stats-Aggr Operator
 
+  Stage: Stage-5
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  Select Operator
+                    expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
+                    outputColumnNames: _col0, _col1, _col2
+                    Statistics: Num rows: 501 Data size: 228456 Basic stats: COMPLETE Column stats: PARTIAL
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 501 Data size: 228456 Basic stats: COMPLETE Column stats: PARTIAL
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                          name: default.dest2
+
 PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
                          UNION  ALL  
       select s2.key as key, s2.value as value from src s2) unionsrc

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out Sat Sep 20 05:29:30 2014
@@ -64,20 +64,22 @@ insert overwrite table outputTbl2 select
 POSTHOOK: type: QUERY
 STAGE DEPENDENCIES:
   Stage-2 is a root stage
-  Stage-3 depends on stages: Stage-2
+  Stage-4 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-4, Stage-5
   Stage-0 depends on stages: Stage-3
   Stage-1 depends on stages: Stage-3
+  Stage-5 depends on stages: Stage-2
 
 STAGE PLANS:
   Stage: Stage-2
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP)
-        Reducer 5 <- Map 4 (GROUP)
-        Union 3 <- Reducer 2 (NONE), Reducer 5 (NONE)
+        Reducer 4 <- Map 3 (GROUP)
+        Reducer 7 <- Map 6 (GROUP)
+        Union 5 <- Reducer 4 (NONE), Reducer 7 (NONE)
 #### A masked pattern was here ####
       Vertices:
-        Map 1 
+        Map 3 
             Map Operator Tree:
                 TableScan
                   alias: inputtbl1
@@ -98,7 +100,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 0 Data size: 30 Basic stats: PARTIAL Column stats: NONE
                         value expressions: _col1 (type: bigint)
-        Map 4 
+        Map 6 
             Map Operator Tree:
                 TableScan
                   alias: inputtbl1
@@ -119,7 +121,7 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 0 Data size: 30 Basic stats: PARTIAL Column stats: NONE
                         value expressions: _col1 (type: bigint)
-        Reducer 2 
+        Reducer 4 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
@@ -129,27 +131,13 @@ STAGE PLANS:
                 Select Operator
                   expressions: _col0 (type: string), _col1 (type: bigint)
                   outputColumnNames: _col0, _col1
-                  Select Operator
-                    expressions: _col0 (type: string), _col1 (type: bigint)
-                    outputColumnNames: _col0, _col1
-                    File Output Operator
-                      compressed: false
-                      table:
-                          input format: org.apache.hadoop.mapred.TextInputFormat
-                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                          name: default.outputtbl1
-                  Select Operator
-                    expressions: _col0 (type: string), _col1 (type: bigint)
-                    outputColumnNames: _col0, _col1
-                    File Output Operator
-                      compressed: false
-                      table:
-                          input format: org.apache.hadoop.mapred.TextInputFormat
-                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                          name: default.outputtbl2
-        Reducer 5 
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+        Reducer 7 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(VALUE._col0)
@@ -159,28 +147,34 @@ STAGE PLANS:
                 Select Operator
                   expressions: _col0 (type: string), _col1 (type: bigint)
                   outputColumnNames: _col0, _col1
+                  File Output Operator
+                    compressed: false
+                    table:
+                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+                        serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+        Union 5 
+            Vertex: Union 5
+
+  Stage: Stage-4
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
                   Select Operator
                     expressions: _col0 (type: string), _col1 (type: bigint)
                     outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                     File Output Operator
                       compressed: false
+                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                       table:
                           input format: org.apache.hadoop.mapred.TextInputFormat
                           output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                           serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                           name: default.outputtbl1
-                  Select Operator
-                    expressions: _col0 (type: string), _col1 (type: bigint)
-                    outputColumnNames: _col0, _col1
-                    File Output Operator
-                      compressed: false
-                      table:
-                          input format: org.apache.hadoop.mapred.TextInputFormat
-                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
-                          name: default.outputtbl2
-        Union 3 
-            Vertex: Union 3
 
   Stage: Stage-3
     Dependency Collection
@@ -205,6 +199,26 @@ STAGE PLANS:
               serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
               name: default.outputtbl2
 
+  Stage: Stage-5
+    Spark
+#### A masked pattern was here ####
+      Vertices:
+        Map 2 
+            Map Operator Tree:
+                TableScan
+                  Select Operator
+                    expressions: _col0 (type: string), _col1 (type: bigint)
+                    outputColumnNames: _col0, _col1
+                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+                      table:
+                          input format: org.apache.hadoop.mapred.TextInputFormat
+                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+                          name: default.outputtbl2
+
 PREHOOK: query: FROM (
   SELECT key, count(1) as values from inputTbl1 group by key
   UNION ALL