You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/09/20 07:29:30 UTC
svn commit: r1626383 - in /hive/branches/spark/ql/src:
java/org/apache/hadoop/hive/ql/parse/spark/
test/results/clientpositive/spark/
Author: xuefu
Date: Sat Sep 20 05:29:30 2014
New Revision: 1626383
URL: http://svn.apache.org/r1626383
Log:
HIVE-7503: Support Hive's multi-table insert query with Spark [Spark Branch] (Chao via Xuefu)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out
hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Sat Sep 20 05:29:30 2014
@@ -31,10 +31,11 @@ import org.apache.hadoop.hive.ql.exec.De
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -48,9 +49,9 @@ import org.apache.hadoop.hive.ql.plan.Sp
import org.apache.hadoop.hive.ql.plan.SparkWork;
/**
- * GenSparkProcContext maintains information about the tasks and operators
+ * GenSparkProcContext maintains information about the tasks and operators
* as we walk the operator tree to break them into SparkTasks.
- *
+ *
* Cloned from GenTezProcContext.
*
*/
@@ -76,6 +77,10 @@ public class GenSparkProcContext impleme
// walk.
public Operator<? extends OperatorDesc> parentOfRoot;
+ // Default task is the task we use for those operators that are not connected
+ // to the newly generated TS
+ public SparkTask defaultTask;
+
// Spark task we're currently processing
public SparkTask currentTask;
@@ -83,6 +88,20 @@ public class GenSparkProcContext impleme
// one.
public BaseWork preceedingWork;
+ // All operators that we should unlink with their parents, for multi-table insertion
+ // It's a mapping from operator to its ONLY parent.
+ public Map<Operator<?>, Operator<?>> opToParentMap;
+
+ // A mapping from operators to their corresponding tasks.
+ // The key for this map could only be:
+ // 1. TableScanOperators (so we know which task for the tree rooted at this TS)
+ // 2. FileSinkOperators (need this info in GenSparkUtils::processFileSinks)
+ // 3. UnionOperator/JoinOperator (need for merging tasks)
+ public final Map<Operator<?>, SparkTask> opToTaskMap;
+
+ // temporary TS generated for multi-table insertion
+ public final Set<TableScanOperator> tempTS;
+
// map that keeps track of the last operator of a task to the work
// that follows it. This is used for connecting them later.
public final Map<Operator<?>, BaseWork> leafOperatorToFollowingWork;
@@ -138,8 +157,10 @@ public class GenSparkProcContext impleme
this.rootTasks = rootTasks;
this.inputs = inputs;
this.outputs = outputs;
- this.currentTask = (SparkTask) TaskFactory.get(
- new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
+ this.defaultTask = (SparkTask) TaskFactory.get(
+ new SparkWork(conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), conf);
+ this.rootTasks.add(defaultTask);
+ this.currentTask = null;
this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>();
this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, SparkEdgeProperty>>();
this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>();
@@ -157,8 +178,8 @@ public class GenSparkProcContext impleme
this.clonedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
this.fileSinkSet = new LinkedHashSet<FileSinkOperator>();
this.connectedReduceSinks = new LinkedHashSet<ReduceSinkOperator>();
-
- rootTasks.add(currentTask);
+ this.opToParentMap = new LinkedHashMap<Operator<?>, Operator<?>>();
+ this.opToTaskMap = new LinkedHashMap<Operator<?>, SparkTask>();
+ this.tempTS = new LinkedHashSet<TableScanOperator>();
}
-
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Sat Sep 20 05:29:30 2014
@@ -34,14 +34,30 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.*;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.*;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.UnionWork;
/**
* GenSparkUtils is a collection of shared helper methods to produce SparkWork
@@ -157,6 +173,16 @@ public class GenSparkUtils {
return mapWork;
}
+ // Create a MapWork for a temporary TableScanOperator
+ // Basically a thin wrapper on GenMapRedUtils.setTaskPlan.
+ public MapWork createMapWork(TableScanOperator root,
+ SparkWork sparkWork, String path, TableDesc tt_desc) throws SemanticException {
+ MapWork mapWork = new MapWork("Map " + (++sequenceNumber));
+ GenMapRedUtils.setTaskPlan(path, path, root, mapWork, false, tt_desc);
+ sparkWork.add(mapWork);
+ return mapWork;
+ }
+
// this method's main use is to help unit testing this class
protected void setupMapWork(MapWork mapWork, GenSparkProcContext context,
PrunedPartitionList partitions, Operator<? extends OperatorDesc> root,
@@ -251,15 +277,19 @@ public class GenSparkUtils {
throws SemanticException {
ParseContext parseContext = context.parseContext;
+ Preconditions.checkArgument(context.opToTaskMap.containsKey(fileSink),
+ "AssertionError: the fileSink " + fileSink.getName() + " should be in the context");
+
+ SparkTask currentTask = context.opToTaskMap.get(fileSink);
boolean isInsertTable = // is INSERT OVERWRITE TABLE
GenMapRedUtils.isInsertInto(parseContext, fileSink);
HiveConf hconf = parseContext.getConf();
boolean chDir = GenMapRedUtils.isMergeRequired(context.moveTask,
- hconf, fileSink, context.currentTask, isInsertTable);
+ hconf, fileSink, currentTask, isInsertTable);
- Path finalName = GenMapRedUtils.createMoveTask(context.currentTask,
+ Path finalName = GenMapRedUtils.createMoveTask(currentTask,
chDir, fileSink, parseContext, context.moveTask, hconf, context.dependencyTask);
if (chDir) {
@@ -268,13 +298,13 @@ public class GenSparkUtils {
logger.info("using CombineHiveInputformat for the merge job");
GenMapRedUtils.createMRWorkForMergingFiles(fileSink, finalName,
context.dependencyTask, context.moveTask,
- hconf, context.currentTask);
+ hconf, currentTask);
}
FetchTask fetchTask = parseContext.getFetchTask();
- if (fetchTask != null && context.currentTask.getNumChild() == 0) {
+ if (fetchTask != null && currentTask.getNumChild() == 0) {
if (fetchTask.isFetchFrom(fileSink.getConf())) {
- context.currentTask.setFetchSource(true);
+ currentTask.setFetchSource(true);
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Sat Sep 20 05:29:30 2014
@@ -50,7 +50,7 @@ import com.google.common.base.Preconditi
* and break the operators into work and tasks along the way.
*
* Cloned from GenTezWork.
- *
+ *
* TODO: need to go thru this to make it fit completely to Spark.
*/
public class GenSparkWork implements NodeProcessor {
@@ -95,6 +95,10 @@ public class GenSparkWork implements Nod
return null;
}
+ if (operator instanceof FileSinkOperator) {
+ context.opToTaskMap.put(operator, context.currentTask);
+ }
+
SparkWork sparkWork = context.currentTask.getWork();
// Right now the work graph is pretty simple. If there is no
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Sat Sep 20 05:29:30 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -72,9 +73,9 @@ import org.apache.hadoop.hive.ql.session
/**
* SparkCompiler translates the operator plan into SparkTasks.
- *
+ *
* Pretty much cloned from TezCompiler.
- *
+ *
* TODO: need to complete and make it fit to Spark.
*/
public class SparkCompiler extends TaskCompiler {
@@ -86,7 +87,7 @@ public class SparkCompiler extends TaskC
@Override
public void init(HiveConf conf, LogHelper console, Hive db) {
super.init(conf, console, db);
-
+
// TODO: Need to check if we require the use of recursive input dirs for union processing
// conf.setBoolean("mapred.input.dir.recursive", true);
// HiveConf.setBoolVar(conf, ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
@@ -103,7 +104,6 @@ public class SparkCompiler extends TaskC
// Create the context for the walker
OptimizeSparkProcContext procCtx
= new OptimizeSparkProcContext(conf, pCtx, inputs, outputs, deque);
-
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
@@ -139,9 +139,39 @@ public class SparkCompiler extends TaskC
GenSparkProcContext procCtx = new GenSparkProcContext(
conf, tempParseContext, mvTask, rootTasks, inputs, outputs);
+ // -------------------- First Pass ---------------------
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(new RuleRegExp("TS", TableScanOperator.getOperatorName() + "%"),
+ new SparkTableScanProcessor());
+
+ Dispatcher disp = new DefaultRuleDispatcher(new SparkMultiInsertionProcessor(), opRules, procCtx);
+ ArrayList<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(pCtx.getTopOps().values());
+ GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
+ ogw.startWalking(topNodes, null);
+
+ // ------------------- Second Pass ----------------------
+
+ // Merge tasks upon Join/Union if possible
+ opRules.clear();
+ opRules.put(new RuleRegExp("Join", JoinOperator.getOperatorName() + "%"),
+ new SparkMergeTaskProcessor());
+ opRules.put(new RuleRegExp("Union", UnionOperator.getOperatorName() + "%"),
+ new SparkMergeTaskProcessor());
+ disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ topNodes = new ArrayList<Node>();
+ topNodes.addAll(procCtx.tempTS); // First process temp TS
+ topNodes.addAll(pCtx.getTopOps().values());
+ ogw = new GenSparkWorkWalker(disp, procCtx);
+ ogw.startWalking(topNodes, null);
+
+
+ // ------------------- Third Pass -----------------------
+
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack. The dispatcher generates the plan from the operator tree
- Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.clear();
opRules.put(new RuleRegExp("Split Work - ReduceSink",
ReduceSinkOperator.getOperatorName() + "%"), genSparkWork);
@@ -154,28 +184,40 @@ public class SparkCompiler extends TaskC
opRules.put(new RuleRegExp("Handle Analyze Command",
TableScanOperator.getOperatorName() + "%"),
- new SparkProcessAnalyzeTable(GenSparkUtils.getUtils()));
-
- opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"),
- new NodeProcessor() {
- @Override
- public Object process(Node n, Stack<Node> s,
- NodeProcessorCtx procCtx, Object... os) throws SemanticException {
- GenSparkProcContext context = (GenSparkProcContext) procCtx;
- UnionOperator union = (UnionOperator) n;
-
- // simply need to remember that we've seen a union.
- context.currentUnionOperators.add(union);
- return null;
+ new CompositeProcessor(
+ new NodeProcessor() {
+ @Override
+ public Object process(Node nd, Stack<Node> s,
+ NodeProcessorCtx procCtx, Object... no) throws SemanticException {
+ GenSparkProcContext context = (GenSparkProcContext) procCtx;
+ context.currentTask = context.opToTaskMap.get(nd);
+ return null;
+ }
+ },
+ new SparkProcessAnalyzeTable(GenSparkUtils.getUtils())));
+
+ opRules.put(new RuleRegExp("Remember union", UnionOperator.getOperatorName() + "%"),
+ new NodeProcessor() {
+ @Override
+ public Object process(Node n, Stack<Node> s,
+ NodeProcessorCtx procCtx, Object... os) throws SemanticException {
+ GenSparkProcContext context = (GenSparkProcContext) procCtx;
+ UnionOperator union = (UnionOperator) n;
+
+ // simply need to remember that we've seen a union.
+ context.currentUnionOperators.add(union);
+ return null;
+ }
}
- });
+ );
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
- Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
- List<Node> topNodes = new ArrayList<Node>();
+ disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ topNodes = new ArrayList<Node>();
topNodes.addAll(pCtx.getTopOps().values());
- GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
+ topNodes.addAll(procCtx.tempTS);
+ ogw = new GenSparkWorkWalker(disp, procCtx);
ogw.startWalking(topNodes, null);
// we need to clone some operator plans and remove union operators still
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java?rev=1626383&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMergeTaskProcessor.java Sat Sep 20 05:29:30 2014
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.util.Map;
+import java.util.Stack;
+
+public class SparkMergeTaskProcessor implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
+ GenSparkProcContext context = (GenSparkProcContext) procCtx;
+ Operator<?> op = (Operator<?>) nd;
+ Map<Operator<?>, SparkTask> opTable = context.opToTaskMap;
+ SparkTask currentTask = opTable.get(context.currentRootOperator);
+ if (!opTable.containsKey(op)) {
+ opTable.put(op, currentTask);
+ } else {
+ // If this op has already been visited, since we visit temporary TS first,
+ // also with the assumption that two paths from two different tembporary TS will NOT
+ // meet, the current task must be the default task.
+ // TODO: better we can prove that they'll never meet.
+ SparkTask existingTask = opTable.get(op);
+ if (currentTask == context.defaultTask && existingTask != context.defaultTask) {
+ opTable.put(context.currentRootOperator, existingTask);
+ }
+ }
+
+ return null;
+ }
+}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java?rev=1626383&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkMultiInsertionProcessor.java Sat Sep 20 05:29:30 2014
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Stack;
+
+
+public class SparkMultiInsertionProcessor implements NodeProcessor {
+
+ private Set<Operator<?>> processed = new HashSet<Operator<?>>();
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ Operator<?> op = (Operator<?>) nd;
+ GenSparkProcContext context = (GenSparkProcContext) procCtx;
+
+ if (context.opToParentMap.containsKey(op)) {
+ splitPlan(op, context);
+ context.opToParentMap.remove(op);
+ }
+
+ return null;
+ }
+
+ /**
+ * Split two tasks by creating a temporary file between them.
+ *
+ * @param op the select operator encountered
+ * @param context processing context
+ */
+ @SuppressWarnings("nls")
+ private void splitPlan(Operator<?> op, GenSparkProcContext context)
+ throws SemanticException {
+ Preconditions.checkArgument(op.getNumParent() == 1,
+ "AssertionError: expecting operator " + op + " to have only one parent," +
+ " but found multiple parents : " + op.getParentOperators());
+ // nested multi-insertion shouldn't happen.
+ SparkTask parentTask = context.defaultTask;
+ SparkTask childTask = (SparkTask) TaskFactory.get(
+ new SparkWork(context.conf.getVar(HiveConf.ConfVars.HIVEQUERYID)), context.conf);
+
+ GenSparkUtils utils = GenSparkUtils.getUtils();
+ ParseContext parseCtx = context.parseContext;
+ parentTask.addDependentTask(childTask);
+
+ // Generate the temporary file name
+ Operator<?> parent = context.opToParentMap.get(op);
+
+ Path taskTmpDir;
+ TableDesc tt_desc;
+
+ if (processed.add(parent)) {
+ taskTmpDir = parseCtx.getContext().getMRTmpPath();
+ tt_desc = PlanUtils.getIntermediateFileTableDesc(PlanUtils
+ .getFieldSchemasFromRowSchema(parent.getSchema(), "temporarycol"));
+ createTempFS(parent, taskTmpDir, tt_desc, parseCtx);
+ } else {
+ FileSinkOperator fs = (FileSinkOperator) parent.getChildOperators().get(0);
+ tt_desc = fs.getConf().getTableInfo();
+ taskTmpDir = fs.getConf().getDirName();
+ }
+
+ TableScanOperator tableScan = createTempTS(parent, op, parseCtx);
+ String streamDesc = taskTmpDir.toUri().toString();
+ context.opToTaskMap.put(tableScan, childTask);
+ context.tempTS.add(tableScan);
+ MapWork mapWork = utils.createMapWork(tableScan, childTask.getWork(), streamDesc, tt_desc);
+ context.rootToWorkMap.put(tableScan, mapWork);
+ }
+
+ private void createTempFS(Operator<? extends OperatorDesc> parent,
+ Path taskTmpDir, TableDesc tt_desc, ParseContext parseCtx) {
+ boolean compressIntermediate =
+ parseCtx.getConf().getBoolVar(HiveConf.ConfVars.COMPRESSINTERMEDIATE);
+ FileSinkDesc desc = new FileSinkDesc(taskTmpDir, tt_desc, compressIntermediate);
+ if (compressIntermediate) {
+ desc.setCompressCodec(parseCtx.getConf().getVar(
+ HiveConf.ConfVars.COMPRESSINTERMEDIATECODEC));
+ desc.setCompressType(parseCtx.getConf().getVar(
+ HiveConf.ConfVars.COMPRESSINTERMEDIATETYPE));
+ }
+ Operator<? extends OperatorDesc> fileSinkOp = GenMapRedUtils.putOpInsertMap(OperatorFactory
+ .get(desc, parent.getSchema()), null, parseCtx);
+
+ // Connect parent to fileSinkOp
+ parent.setChildOperators(Utilities.makeList(fileSinkOp));
+ fileSinkOp.setParentOperators(Utilities.makeList(parent));
+ }
+
+ private TableScanOperator createTempTS(Operator<? extends OperatorDesc> parent,
+ Operator<? extends OperatorDesc> child,
+ ParseContext parseCtx) {
+ // Create a dummy TableScanOperator for the file generated through fileSinkOp
+ RowResolver parentRowResolver =
+ parseCtx.getOpParseCtx().get(parent).getRowResolver();
+ TableScanOperator tableScanOp = (TableScanOperator) GenMapRedUtils.putOpInsertMap(
+ GenMapRedUtils.createTemporaryTableScanOperator(parent.getSchema()),
+ parentRowResolver, parseCtx);
+
+ tableScanOp.setChildOperators(Utilities.makeList(child));
+ child.setParentOperators(Utilities.makeList(tableScanOp));
+
+ return tableScanOp;
+ }
+
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkProcessAnalyzeTable.java Sat Sep 20 05:29:30 2014
@@ -77,8 +77,12 @@ public class SparkProcessAnalyzeTable im
public Object process(Node nd, Stack<Node> stack,
NodeProcessorCtx procContext, Object... nodeOutputs) throws SemanticException {
GenSparkProcContext context = (GenSparkProcContext) procContext;
-
+
TableScanOperator tableScan = (TableScanOperator) nd;
+ // If this tableScan is a generated one for multi-insertion, ignore it
+ if (context.tempTS.contains(tableScan)) {
+ return null;
+ }
ParseContext parseContext = context.parseContext;
Class<? extends InputFormat> inputFormat = parseContext.getTopToTable().get(tableScan)
@@ -167,14 +171,14 @@ public class SparkProcessAnalyzeTable im
*
* It is composed of PartialScanTask followed by StatsTask.
*/
- private void handlePartialScanCommand(TableScanOperator tableScan, ParseContext parseContext,
+ private void handlePartialScanCommand(TableScanOperator tableScan, ParseContext parseContext,
QBParseInfo parseInfo, StatsWork statsWork, GenSparkProcContext context,
Task<StatsWork> statsTask) throws SemanticException {
String aggregationKey = tableScan.getConf().getStatsAggPrefix();
StringBuffer aggregationKeyBuffer = new StringBuffer(aggregationKey);
List<Path> inputPaths = GenMapRedUtils.getInputPathsForPartialScan(parseInfo, aggregationKeyBuffer);
aggregationKey = aggregationKeyBuffer.toString();
-
+
// scan work
PartialScanWork scanWork = new PartialScanWork(inputPaths);
scanWork.setMapperCannotSpanPartns(true);
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java?rev=1626383&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkTableScanProcessor.java Sat Sep 20 05:29:30 2014
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.parse.spark;
+
+import com.clearspring.analytics.util.Preconditions;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Stack;
+
+public class SparkTableScanProcessor implements NodeProcessor {
+
+ @Override
+ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+ Object... nodeOutputs) throws SemanticException {
+ GenSparkProcContext context = (GenSparkProcContext) procCtx;
+ TableScanOperator tblScan = (TableScanOperator) nd;
+
+ context.opToTaskMap.put(tblScan, context.defaultTask);
+
+ // For multi-table insertion, we first look for potential multiple FSs that can be reached
+ // from this TS. In the process of searching, we also record the path to each of these FS.
+ // Then, we find the LCA for these FSs.
+ //
+ // That is, in scenarios like the following:
+ //
+ // OP 1 (TS, UNION, etc)
+ // / \
+ // OP 2 OP 3
+ //
+ // If we find such an operator, we record all of its children to context, and unlink
+ // them with this operator later, in SparkMultiInsertionProcessor, and it will be become:
+ //
+ // OP 1 (TS, UNION, FOR, etc)
+ // |
+ // FS
+ //
+ // TS TS
+ // | |
+ // OP 2 OP 3
+ //
+ // where the two branches starting with TS are in different Spark tasks.
+ //
+ // Because of the restrictions on multi-insertion queries, there could only be two
+ // categories of TS here: one through which we can reach multiple FSs, and one through
+ // which we can only reach one FS. For all TS in the first category, they should only
+ // be able to reach the same set of FS.
+ // A further conclusion is, there should only be one LCA for the entire operator tree.
+ //
+ // N.B.: one special case is when OP is ForwardOperator, in which case we shouldn't break
+ // the tree since it's already optimized.
+ Map<FileSinkOperator, Stack<Operator<? extends OperatorDesc>>> fsToPath
+ = new HashMap<FileSinkOperator, Stack<Operator<? extends OperatorDesc>>>();
+ Queue<Stack<Operator<? extends OperatorDesc>>> paths =
+ new LinkedList<Stack<Operator<? extends OperatorDesc>>>();
+ Stack<Operator<? extends OperatorDesc>> p = new Stack<Operator<? extends OperatorDesc>>();
+ p.push(tblScan);
+ paths.offer(p);
+
+ while (!paths.isEmpty()) {
+ Stack<Operator<? extends OperatorDesc>> currPath = paths.poll();
+ Operator<? extends OperatorDesc> currOp = currPath.peek();
+ if (currOp instanceof FileSinkOperator) {
+ FileSinkOperator fsOp = (FileSinkOperator) currOp;
+ // In case there are multiple paths lead to this FS, we keep the shortest one.
+ // (We could also keep the longest one - it doesn't matter)
+ if (!fsToPath.containsKey(fsOp) || currPath.size() < fsToPath.get(fsOp).size()) {
+ fsToPath.put(fsOp, currPath);
+ }
+ }
+
+ for (Operator<? extends OperatorDesc> nextOp : currOp.getChildOperators()) {
+ Stack<Operator<? extends OperatorDesc>> nextPath = new Stack<Operator<? extends OperatorDesc>>();
+ nextPath.addAll(currPath);
+ nextPath.push(nextOp);
+ paths.offer(nextPath);
+ }
+ }
+
+ if (fsToPath.size() > 1) {
+ // Now, compute the LOWEST height for all these FSs
+ int lowest = -1;
+ for (Map.Entry<FileSinkOperator, Stack<Operator<? extends OperatorDesc>>> e : fsToPath.entrySet()) {
+ if (lowest < 0 || e.getValue().size() < lowest) {
+ lowest = e.getValue().size();
+ }
+ }
+
+ // Now, we move up those path that has length larger than the lowest
+ for (Stack<Operator<? extends OperatorDesc>> st : fsToPath.values()) {
+ while (st.size() > lowest) {
+ st.pop();
+ }
+ }
+
+ // Now, we move all paths up together, until we reach a least common ancestor
+ Operator<? extends OperatorDesc> lca;
+ while (true) {
+ lca = null;
+ boolean same = true;
+ for (Stack<Operator<? extends OperatorDesc>> st : fsToPath.values()) {
+ Operator<? extends OperatorDesc> op = st.pop();
+ if (lca == null) {
+ lca = op;
+ } else if (lca != op) {
+ same = false; // but we still need to pop the rest..
+ }
+ }
+ if (same) {
+ break;
+ }
+ }
+
+ Preconditions.checkArgument(lca.getNumChild() > 1,
+ "AssertionError: the LCA should have multiple children, but got " + lca.getNumChild());
+
+ // Special case: don't break if LCA is FOR.
+ if (!(lca instanceof ForwardOperator)) {
+ for (Operator<? extends OperatorDesc> childOp : lca.getChildOperators()) {
+ context.opToParentMap.put(childOp, lca);
+ }
+ }
+ }
+
+ return null;
+ }
+}
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/insert1.q.out Sat Sep 20 05:29:30 2014
@@ -265,22 +265,38 @@ insert overwrite table x.insert1 select
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-3 depends on stages: Stage-2
+ Stage-4 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-4, Stage-5
Stage-0 depends on stages: Stage-3
- Stage-4 depends on stages: Stage-0
+ Stage-6 depends on stages: Stage-0
Stage-1 depends on stages: Stage-3
- Stage-5 depends on stages: Stage-1
+ Stage-7 depends on stages: Stage-1
+ Stage-5 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-2
Spark
#### A masked pattern was here ####
Vertices:
- Map 1
+ Map 3
Map Operator Tree:
TableScan
alias: insert2
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+
+ Stage: Stage-4
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
Filter Operator
predicate: (key < 10) (type: boolean)
Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
@@ -296,21 +312,6 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.insert1
- Filter Operator
- predicate: ((key > 10) and (key < 20)) (type: boolean)
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- Select Operator
- expressions: key (type: int), value (type: string)
- outputColumnNames: _col0, _col1
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- File Output Operator
- compressed: false
- Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: x.insert1
Stage: Stage-3
Dependency Collection
@@ -325,7 +326,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.insert1
- Stage: Stage-4
+ Stage: Stage-6
Stats-Aggr Operator
Stage: Stage-1
@@ -338,9 +339,32 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: x.insert1
- Stage: Stage-5
+ Stage: Stage-7
Stats-Aggr Operator
+ Stage: Stage-5
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 2
+ Map Operator Tree:
+ TableScan
+ Filter Operator
+ predicate: ((key > 10) and (key < 20)) (type: boolean)
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ Select Operator
+ expressions: key (type: int), value (type: string)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: x.insert1
+
PREHOOK: query: -- HIVE-3676
CREATE DATABASE db2
PREHOOK: type: CREATEDATABASE
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/union18.q.out Sat Sep 20 05:29:30 2014
@@ -34,21 +34,23 @@ INSERT OVERWRITE TABLE DEST2 SELECT unio
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-3 depends on stages: Stage-2
+ Stage-4 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-4, Stage-5
Stage-0 depends on stages: Stage-3
- Stage-4 depends on stages: Stage-0
+ Stage-6 depends on stages: Stage-0
Stage-1 depends on stages: Stage-3
- Stage-5 depends on stages: Stage-1
+ Stage-7 depends on stages: Stage-1
+ Stage-5 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-2
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP)
- Union 3 <- Map 4 (NONE), Reducer 2 (NONE)
+ Reducer 4 <- Map 3 (GROUP)
+ Union 5 <- Map 6 (NONE), Reducer 4 (NONE)
#### A masked pattern was here ####
Vertices:
- Map 1
+ Map 3
Map Operator Tree:
TableScan
alias: s1
@@ -64,34 +66,20 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: bigint)
- Map 4
+ Map 6
Map Operator Tree:
TableScan
alias: s2
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
- Select Operator
- expressions: _col0 (type: string), _col1 (type: string)
- outputColumnNames: _col0, _col1
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest1
- Select Operator
- expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
- outputColumnNames: _col0, _col1, _col2
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest2
- Reducer 2
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Reducer 4
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
@@ -100,28 +88,34 @@ STAGE PLANS:
Select Operator
expressions: 'tst1' (type: string), UDFToString(_col0) (type: string)
outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Union 5
+ Vertex: Union 5
+
+ Stage: Stage-4
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
Select Operator
expressions: _col0 (type: string), _col1 (type: string)
outputColumnNames: _col0, _col1
+ Statistics: Num rows: 501 Data size: 136272 Basic stats: COMPLETE Column stats: PARTIAL
File Output Operator
compressed: false
+ Statistics: Num rows: 501 Data size: 136272 Basic stats: COMPLETE Column stats: PARTIAL
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
- Select Operator
- expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
- outputColumnNames: _col0, _col1, _col2
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest2
- Union 3
- Vertex: Union 3
Stage: Stage-3
Dependency Collection
@@ -136,7 +130,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
- Stage: Stage-4
+ Stage: Stage-6
Stats-Aggr Operator
Stage: Stage-1
@@ -149,9 +143,29 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest2
- Stage: Stage-5
+ Stage: Stage-7
Stats-Aggr Operator
+ Stage: Stage-5
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 2
+ Map Operator Tree:
+ TableScan
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 501 Data size: 228456 Basic stats: COMPLETE Column stats: PARTIAL
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 501 Data size: 228456 Basic stats: COMPLETE Column stats: PARTIAL
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest2
+
PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
UNION ALL
select s2.key as key, s2.value as value from src s2) unionsrc
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/union19.q.out Sat Sep 20 05:29:30 2014
@@ -34,22 +34,23 @@ INSERT OVERWRITE TABLE DEST2 SELECT unio
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-3 depends on stages: Stage-2
+ Stage-4 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-4, Stage-5
Stage-0 depends on stages: Stage-3
- Stage-4 depends on stages: Stage-0
+ Stage-6 depends on stages: Stage-0
Stage-1 depends on stages: Stage-3
- Stage-5 depends on stages: Stage-1
+ Stage-7 depends on stages: Stage-1
+ Stage-5 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-2
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP)
- Reducer 4 <- Union 3 (GROUP)
- Union 3 <- Map 5 (NONE), Reducer 2 (NONE)
+ Reducer 4 <- Map 3 (GROUP)
+ Union 5 <- Map 6 (NONE), Reducer 4 (NONE)
#### A masked pattern was here ####
Vertices:
- Map 1
+ Map 3
Map Operator Tree:
TableScan
alias: s1
@@ -65,37 +66,20 @@ STAGE PLANS:
sort order:
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
value expressions: _col0 (type: bigint)
- Map 5
+ Map 6
Map Operator Tree:
TableScan
alias: s2
Select Operator
expressions: key (type: string), value (type: string)
outputColumnNames: _col0, _col1
- Select Operator
- expressions: _col0 (type: string), _col1 (type: string)
- outputColumnNames: _col0, _col1
- Group By Operator
- aggregations: count(_col1)
- keys: _col0 (type: string)
- mode: hash
- outputColumnNames: _col0, _col1
- Reduce Output Operator
- key expressions: _col0 (type: string)
- sort order: +
- Map-reduce partition columns: _col0 (type: string)
- value expressions: _col1 (type: bigint)
- Select Operator
- expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
- outputColumnNames: _col0, _col1, _col2
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest2
- Reducer 2
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Reducer 4
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
@@ -104,30 +88,41 @@ STAGE PLANS:
Select Operator
expressions: 'tst1' (type: string), UDFToString(_col0) (type: string)
outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Union 5
+ Vertex: Union 5
+
+ Stage: Stage-4
+ Spark
+ Edges:
+ Reducer 7 <- Map 1 (GROUP)
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
Select Operator
expressions: _col0 (type: string), _col1 (type: string)
outputColumnNames: _col0, _col1
+ Statistics: Num rows: 501 Data size: 5584 Basic stats: COMPLETE Column stats: PARTIAL
Group By Operator
aggregations: count(_col1)
keys: _col0 (type: string)
mode: hash
outputColumnNames: _col0, _col1
+ Statistics: Num rows: 501 Data size: 48096 Basic stats: COMPLETE Column stats: PARTIAL
Reduce Output Operator
key expressions: _col0 (type: string)
sort order: +
Map-reduce partition columns: _col0 (type: string)
+ Statistics: Num rows: 501 Data size: 48096 Basic stats: COMPLETE Column stats: PARTIAL
value expressions: _col1 (type: bigint)
- Select Operator
- expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
- outputColumnNames: _col0, _col1, _col2
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.dest2
- Reducer 4
+ Reducer 7
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
@@ -147,8 +142,6 @@ STAGE PLANS:
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
- Union 3
- Vertex: Union 3
Stage: Stage-3
Dependency Collection
@@ -163,7 +156,7 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest1
- Stage: Stage-4
+ Stage: Stage-6
Stats-Aggr Operator
Stage: Stage-1
@@ -176,9 +169,29 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.dest2
- Stage: Stage-5
+ Stage: Stage-7
Stats-Aggr Operator
+ Stage: Stage-5
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 2
+ Map Operator Tree:
+ TableScan
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: string), _col1 (type: string)
+ outputColumnNames: _col0, _col1, _col2
+ Statistics: Num rows: 501 Data size: 228456 Basic stats: COMPLETE Column stats: PARTIAL
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 501 Data size: 228456 Basic stats: COMPLETE Column stats: PARTIAL
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.dest2
+
PREHOOK: query: FROM (select 'tst1' as key, cast(count(1) as string) as value from src s1
UNION ALL
select s2.key as key, s2.value as value from src s2) unionsrc
Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out?rev=1626383&r1=1626382&r2=1626383&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/union_remove_6.q.out Sat Sep 20 05:29:30 2014
@@ -64,20 +64,22 @@ insert overwrite table outputTbl2 select
POSTHOOK: type: QUERY
STAGE DEPENDENCIES:
Stage-2 is a root stage
- Stage-3 depends on stages: Stage-2
+ Stage-4 depends on stages: Stage-2
+ Stage-3 depends on stages: Stage-4, Stage-5
Stage-0 depends on stages: Stage-3
Stage-1 depends on stages: Stage-3
+ Stage-5 depends on stages: Stage-2
STAGE PLANS:
Stage: Stage-2
Spark
Edges:
- Reducer 2 <- Map 1 (GROUP)
- Reducer 5 <- Map 4 (GROUP)
- Union 3 <- Reducer 2 (NONE), Reducer 5 (NONE)
+ Reducer 4 <- Map 3 (GROUP)
+ Reducer 7 <- Map 6 (GROUP)
+ Union 5 <- Reducer 4 (NONE), Reducer 7 (NONE)
#### A masked pattern was here ####
Vertices:
- Map 1
+ Map 3
Map Operator Tree:
TableScan
alias: inputtbl1
@@ -98,7 +100,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 0 Data size: 30 Basic stats: PARTIAL Column stats: NONE
value expressions: _col1 (type: bigint)
- Map 4
+ Map 6
Map Operator Tree:
TableScan
alias: inputtbl1
@@ -119,7 +121,7 @@ STAGE PLANS:
Map-reduce partition columns: _col0 (type: string)
Statistics: Num rows: 0 Data size: 30 Basic stats: PARTIAL Column stats: NONE
value expressions: _col1 (type: bigint)
- Reducer 2
+ Reducer 4
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
@@ -129,27 +131,13 @@ STAGE PLANS:
Select Operator
expressions: _col0 (type: string), _col1 (type: bigint)
outputColumnNames: _col0, _col1
- Select Operator
- expressions: _col0 (type: string), _col1 (type: bigint)
- outputColumnNames: _col0, _col1
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.outputtbl1
- Select Operator
- expressions: _col0 (type: string), _col1 (type: bigint)
- outputColumnNames: _col0, _col1
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.outputtbl2
- Reducer 5
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Reducer 7
Reduce Operator Tree:
Group By Operator
aggregations: count(VALUE._col0)
@@ -159,28 +147,34 @@ STAGE PLANS:
Select Operator
expressions: _col0 (type: string), _col1 (type: bigint)
outputColumnNames: _col0, _col1
+ File Output Operator
+ compressed: false
+ table:
+ input format: org.apache.hadoop.mapred.SequenceFileInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe
+ Union 5
+ Vertex: Union 5
+
+ Stage: Stage-4
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 1
+ Map Operator Tree:
+ TableScan
Select Operator
expressions: _col0 (type: string), _col1 (type: bigint)
outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
File Output Operator
compressed: false
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.outputtbl1
- Select Operator
- expressions: _col0 (type: string), _col1 (type: bigint)
- outputColumnNames: _col0, _col1
- File Output Operator
- compressed: false
- table:
- input format: org.apache.hadoop.mapred.TextInputFormat
- output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
- serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- name: default.outputtbl2
- Union 3
- Vertex: Union 3
Stage: Stage-3
Dependency Collection
@@ -205,6 +199,26 @@ STAGE PLANS:
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
name: default.outputtbl2
+ Stage: Stage-5
+ Spark
+#### A masked pattern was here ####
+ Vertices:
+ Map 2
+ Map Operator Tree:
+ TableScan
+ Select Operator
+ expressions: _col0 (type: string), _col1 (type: bigint)
+ outputColumnNames: _col0, _col1
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ File Output Operator
+ compressed: false
+ Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+ table:
+ input format: org.apache.hadoop.mapred.TextInputFormat
+ output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+ serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
+ name: default.outputtbl2
+
PREHOOK: query: FROM (
SELECT key, count(1) as values from inputTbl1 group by key
UNION ALL