You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/20 22:27:59 UTC
svn commit: r1543957 - in
/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql: exec/ exec/tez/
optimizer/ parse/ plan/
Author: gunther
Date: Wed Nov 20 21:27:58 2013
New Revision: 1543957
URL: http://svn.apache.org/r1543957
Log:
HIVE-5861: Fix exception in multi insert statement on Tez (Gunther Hagleitner)
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Wed Nov 20 21:27:58 2013
@@ -21,13 +21,18 @@ package org.apache.hadoop.hive.ql.exec;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.mapred.OutputCollector;
public class OperatorUtils {
+ private static final Log LOG = LogFactory.getLog(OperatorUtils.class);
+
public static <T> Set<T> findOperators(Operator<?> start, Class<T> clazz) {
return findOperators(start, clazz, new HashSet<T>());
}
@@ -63,11 +68,29 @@ public class OperatorUtils {
return;
}
for (Operator<? extends OperatorDesc> op : childOperators) {
- if(op.getName().equals(ReduceSinkOperator.getOperatorName())) { //TODO:
+ if(op.getName().equals(ReduceSinkOperator.getOperatorName())) {
((ReduceSinkOperator)op).setOutputCollector(out);
} else {
setChildrenCollector(op.getChildOperators(), out);
}
}
}
+
+ public static void setChildrenCollector(List<Operator<? extends OperatorDesc>> childOperators, Map<String, OutputCollector> outMap) {
+ if (childOperators == null) {
+ return;
+ }
+ for (Operator<? extends OperatorDesc> op : childOperators) {
+ if(op.getName().equals(ReduceSinkOperator.getOperatorName())) {
+ ReduceSinkOperator rs = ((ReduceSinkOperator)op);
+ if (outMap.containsKey(rs.getConf().getOutputName())) {
+ LOG.info("Setting output collector: " + rs + " --> "
+ + rs.getConf().getOutputName());
+ rs.setOutputCollector(outMap.get(rs.getConf().getOutputName()));
+ }
+ } else {
+ setChildrenCollector(op.getChildOperators(), outMap);
+ }
+ }
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Nov 20 21:27:58 2013
@@ -64,9 +64,9 @@ public class MapRecordProcessor extends
@Override
void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
- OutputCollector out){
+ Map<String, OutputCollector> outMap){
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
- super.init(jconf, mrReporter, inputs, out);
+ super.init(jconf, mrReporter, inputs, outMap);
//Update JobConf using MRInput, info like filename comes via this
MRInputLegacy mrInput = getMRInput(inputs);
@@ -124,7 +124,7 @@ public class MapRecordProcessor extends
}
}
- OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), out);
+ OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), outMap);
mapOp.setReporter(reporter);
MapredContext.get().setReporter(reporter);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Nov 20 21:27:58 2013
@@ -39,7 +39,7 @@ public abstract class RecordProcessor {
protected JobConf jconf;
protected Map<String, LogicalInput> inputs;
- protected OutputCollector out;
+ protected Map<String, OutputCollector> outMap;
public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
@@ -63,11 +63,11 @@ public abstract class RecordProcessor {
* @param out
*/
void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
- OutputCollector out){
+ Map<String, OutputCollector> outMap){
this.jconf = jconf;
this.reporter = mrReporter;
this.inputs = inputs;
- this.out = out;
+ this.outMap = outMap;
// Allocate the bean at the beginning -
memoryMXBean = ManagementFactory.getMemoryMXBean();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Nov 20 21:27:58 2013
@@ -90,9 +90,9 @@ public class ReduceRecordProcessor exte
@Override
void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
- OutputCollector out){
+ Map<String, OutputCollector> outMap){
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
- super.init(jconf, mrReporter, inputs, out);
+ super.init(jconf, mrReporter, inputs, outMap);
ObjectCache cache = ObjectCacheFactory.getCache(jconf);
@@ -162,8 +162,8 @@ public class ReduceRecordProcessor exte
if (dummyOps != null) {
children.addAll(dummyOps);
}
- OperatorUtils.setChildrenCollector(children, out);
-
+ OperatorUtils.setChildrenCollector(children, outMap);
+
reducer.setReporter(reporter);
MapredContext.get().setReporter(reporter);
@@ -329,10 +329,6 @@ public class ReduceRecordProcessor exte
if (!abort) {
abort = execContext.getIoCxt().getIOExceptions();
}
- // No row was processed
- if (out == null) {
- l4j.trace("Close called no row");
- }
try {
if (groupKey != null) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Nov 20 21:27:58 2013
@@ -17,12 +17,14 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.KVOutputCollector;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
@@ -89,17 +91,14 @@ public class TezProcessor implements Log
LOG.info("Running map: " + processorContext.getUniqueIdentifier());
- if(outputs.size() > 1) {
- throw new IOException("Cannot handle more than one output"
- + ", outputCount=" + outputs.size());
- }
-
- LogicalOutput out = outputs.values().iterator().next();
-
+ Map<String, OutputCollector> outMap = new HashMap<String, OutputCollector>();
-
- KeyValueWriter kvWriter = (KeyValueWriter)out.getWriter();
- OutputCollector collector = new KVOutputCollector(kvWriter);
+ for (String outputName: outputs.keySet()) {
+ LOG.info("Handling output: " + outputName);
+ KeyValueWriter kvWriter = (KeyValueWriter) outputs.get(outputName).getWriter();
+ OutputCollector collector = new KVOutputCollector(kvWriter);
+ outMap.put(outputName, collector);
+ }
if(isMap){
rproc = new MapRecordProcessor();
@@ -109,7 +108,7 @@ public class TezProcessor implements Log
}
MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
- rproc.init(jobConf, mrReporter, inputs, collector);
+ rproc.init(jobConf, mrReporter, inputs, outMap);
rproc.run();
//done - output does not need to be committed as hive does not use outputcommitter
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Wed Nov 20 21:27:58 2013
@@ -88,6 +88,10 @@ public class ReduceSinkMapJoinProc imple
// link the work with the work associated with the reduce sink that triggered this rule
TezWork tezWork = context.currentTask.getWork();
tezWork.connect(parentWork, myWork, EdgeType.BROADCAST_EDGE);
+
+ // remember the output name of the reduce sink
+ parentRS.getConf().setOutputName(myWork.getName());
+
} else {
List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(childOp);
if (linkWorkList == null) {
@@ -95,6 +99,14 @@ public class ReduceSinkMapJoinProc imple
}
linkWorkList.add(parentWork);
context.linkOpWithWorkMap.put(childOp, linkWorkList);
+
+ List<ReduceSinkOperator> reduceSinks
+ = context.linkWorkWithReduceSinkMap.get(parentWork);
+ if (reduceSinks == null) {
+ reduceSinks = new ArrayList<ReduceSinkOperator>();
+ }
+ reduceSinks.add(parentRS);
+ context.linkWorkWithReduceSinkMap.put(parentWork, reduceSinks);
}
break;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Wed Nov 20 21:27:58 2013
@@ -24,10 +24,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Stack;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -89,6 +91,10 @@ public class GenTezProcContext implement
// traversing an operator tree
public final Map<Operator<?>, List<BaseWork>> linkOpWithWorkMap;
+ // a map to keep track of what reduce sinks have to be hooked up to
+ // map join work
+ public final Map<BaseWork, List<ReduceSinkOperator>> linkWorkWithReduceSinkMap;
+
// a map that maintains operator (file-sink or reduce-sink) to work mapping
public final Map<Operator<?>, BaseWork> operatorWorkMap;
@@ -102,6 +108,15 @@ public class GenTezProcContext implement
// used to group dependent tasks for multi table inserts
public final DependencyCollectionTask dependencyTask;
+ // root of last multi child operator encountered
+ public Stack<Operator<?>> lastRootOfMultiChildOperator;
+
+ // branches of current multi-child operator
+ public Stack<Integer> currentBranchCount;
+
+ // work generated for last multi-child operator
+ public Stack<BaseWork> lastWorkForMultiChildOperator;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -117,10 +132,14 @@ public class GenTezProcContext implement
this.leafOperatorToFollowingWork = new HashMap<Operator<?>, BaseWork>();
this.rootOperators = rootOperators;
this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
+ this.linkWorkWithReduceSinkMap = new HashMap<BaseWork, List<ReduceSinkOperator>>();
this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>();
this.linkChildOpWithDummyOp = new HashMap<Operator<?>, List<Operator<?>>>();
this.dependencyTask = (DependencyCollectionTask)
TaskFactory.get(new DependencyCollectionWork(), conf);
+ this.lastRootOfMultiChildOperator = new Stack<Operator<?>>();
+ this.currentBranchCount = new Stack<Integer>();
+ this.lastWorkForMultiChildOperator = new Stack<BaseWork>();
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Wed Nov 20 21:27:58 2013
@@ -71,17 +71,18 @@ public class GenTezWork implements NodeP
// packing into a vertex, typically a table scan, union or join
Operator<?> root = context.currentRootOperator;
if (root == null) {
- // if there are no more rootOperators we're dealing with multiple
- // file sinks off of the same table scan. Bail.
- if (context.rootOperators.isEmpty()) {
- return null;
- }
-
// null means that we're starting with a new table scan
// the graph walker walks the rootOperators in the same
// order so we can just take the next
context.preceedingWork = null;
- root = context.rootOperators.pop();
+
+ // if there are branches remaining we can't pop the next
+ // root operator yet.
+ if (context.currentBranchCount.isEmpty()
+ || (!context.lastWorkForMultiChildOperator.isEmpty()
+ && context.lastWorkForMultiChildOperator.peek() == null)) {
+ root = context.rootOperators.pop();
+ }
}
LOG.debug("Root operator: " + root);
@@ -93,18 +94,51 @@ public class GenTezWork implements NodeP
// a reduce vertex
BaseWork work;
if (context.preceedingWork == null) {
- assert root.getParentOperators().isEmpty();
- MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
- LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
-
- // map work starts with table scan operators
- assert root instanceof TableScanOperator;
- String alias = ((TableScanOperator)root).getConf().getAlias();
-
- GenMapRedUtils.setMapWork(mapWork, context.parseContext,
- context.inputs, null, root, alias, context.conf, false);
- tezWork.add(mapWork);
- work = mapWork;
+ if (root == null) {
+ // this is the multi-insert case. we need to reuse the last
+ // table scan work.
+ root = context.lastRootOfMultiChildOperator.peek();
+ work = context.lastWorkForMultiChildOperator.peek();
+ LOG.debug("Visiting additional branch in: "+root);
+
+ } else {
+ assert root.getParentOperators().isEmpty();
+ MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
+ LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
+
+ // map work starts with table scan operators
+ assert root instanceof TableScanOperator;
+ String alias = ((TableScanOperator)root).getConf().getAlias();
+
+ GenMapRedUtils.setMapWork(mapWork, context.parseContext,
+ context.inputs, null, root, alias, context.conf, false);
+ tezWork.add(mapWork);
+ work = mapWork;
+
+ // remember this table scan and work item. this is needed for multiple
+ // insert statements where multiple operator pipelines hang of a single
+ // table scan
+ if (!context.lastWorkForMultiChildOperator.isEmpty()
+ && context.lastWorkForMultiChildOperator.peek() == null) {
+ LOG.debug("Capturing current work for 'multiple branches' case");
+ context.lastWorkForMultiChildOperator.pop();
+ context.lastWorkForMultiChildOperator.push(work);
+ }
+ }
+
+ if (!context.currentBranchCount.isEmpty()) {
+ // we've handled one branch. Adjust the counts.
+ int branches = context.currentBranchCount.pop();
+ if (--branches != 0) {
+ LOG.debug("Remaining branches: "+branches);
+ context.currentBranchCount.push(branches);
+ } else {
+ LOG.debug("No more remaining branches.");
+ context.lastRootOfMultiChildOperator.pop();
+ context.lastWorkForMultiChildOperator.pop();
+ }
+ }
+
} else {
assert !root.getParentOperators().isEmpty();
ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
@@ -119,6 +153,8 @@ public class GenTezWork implements NodeP
assert context.parentOfRoot instanceof ReduceSinkOperator;
ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
+ LOG.debug("Setting up reduce sink: " + reduceSink);
+
reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
// need to fill in information about the key and value in the reducer
@@ -128,12 +164,25 @@ public class GenTezWork implements NodeP
reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
context.preceedingWork.getName());
+ // remember the output name of the reduce sink
+ reduceSink.getConf().setOutputName(reduceWork.getName());
+
tezWork.add(reduceWork);
tezWork.connect(
context.preceedingWork,
reduceWork, EdgeType.SIMPLE_EDGE);
work = reduceWork;
+
+ // remember this work item. this is needed for multiple
+ // insert statements where multiple operator pipelines hang of a forward
+ // operator
+ if (!context.lastWorkForMultiChildOperator.isEmpty()
+ && context.lastWorkForMultiChildOperator.peek() == null) {
+ LOG.debug("Capturing current work for 'multiple branches' case");
+ context.lastWorkForMultiChildOperator.pop();
+ context.lastWorkForMultiChildOperator.push(work);
+ }
}
// We're scanning the operator from table scan to final file sink.
@@ -162,9 +211,11 @@ public class GenTezWork implements NodeP
// remember which parent belongs to which tag
rWork.getTagToInput().put(rs.getConf().getTag(), work.getName());
+ // remember the output name of the reduce sink
+ rs.getConf().setOutputName(rWork.getName());
+
// add dependency between the two work items
- tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator),
- EdgeType.SIMPLE_EDGE);
+ tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
}
// This is where we cut the tree as described above. We also remember that
@@ -183,6 +234,7 @@ public class GenTezWork implements NodeP
context.currentRootOperator = operator.getChildOperators().get(0);
context.preceedingWork = work;
} else {
+ LOG.debug("Leaf operator - resetting context: " + context.currentRootOperator);
context.parentOfRoot = null;
context.currentRootOperator = null;
context.preceedingWork = null;
@@ -214,6 +266,13 @@ public class GenTezWork implements NodeP
}
for (BaseWork parentWork : linkWorkList) {
tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
+
+ // need to set up output name for reduce sink not that we know the name
+ // of the downstream work
+ for (ReduceSinkOperator r:
+ context.linkWorkWithReduceSinkMap.get(parentWork)) {
+ r.getConf().setOutputName(work.getName());
+ }
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Wed Nov 20 21:27:58 2013
@@ -34,10 +34,12 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.UnionOperator;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -153,6 +155,40 @@ public class TezCompiler extends TaskCom
}
});
+ opRules.put(new RuleRegExp("Setup table scan",
+ TableScanOperator.getOperatorName() + "%"), new NodeProcessor()
+ {
+ @Override
+ public Object process(Node n, Stack<Node> s,
+ NodeProcessorCtx procCtx, Object... os) throws SemanticException {
+ GenTezProcContext context = (GenTezProcContext) procCtx;
+ TableScanOperator tableScan = (TableScanOperator) n;
+ LOG.debug("TableScan operator ("+tableScan
+ +"). Number of branches: "+tableScan.getNumChild());
+ context.lastRootOfMultiChildOperator.push(tableScan);
+ context.currentBranchCount.push(tableScan.getNumChild());
+ context.lastWorkForMultiChildOperator.push(null);
+ return null;
+ }
+ });
+
+ opRules.put(new RuleRegExp("Handle Forward opertor",
+ ForwardOperator.getOperatorName() + "%"), new NodeProcessor()
+ {
+ @Override
+ public Object process(Node n, Stack<Node> s,
+ NodeProcessorCtx procCtx, Object... os) throws SemanticException {
+ GenTezProcContext context = (GenTezProcContext) procCtx;
+ ForwardOperator forward = (ForwardOperator) n;
+ LOG.debug("Forward operator ("+forward+
+ "). Number of branches: "+forward.getNumChild());
+ context.lastRootOfMultiChildOperator.push(context.currentRootOperator);
+ context.currentBranchCount.push(forward.getNumChild());
+ context.lastWorkForMultiChildOperator.push(null);
+ return null;
+ }
+ });
+
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Wed Nov 20 21:27:58 2013
@@ -60,6 +60,12 @@ public class ReduceSinkDesc extends Abst
private int numDistributionKeys;
/**
+ * Used in tez. Holds the name of the output
+ * that this reduce sink is writing to.
+ */
+ private String outputName;
+
+ /**
* The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
* Partition columns decide the reducer that the current row goes to.
* Partition columns are not passed to reducer.
@@ -273,4 +279,12 @@ public class ReduceSinkDesc extends Abst
List<List<Integer>> distinctColumnIndices) {
this.distinctColumnIndices = distinctColumnIndices;
}
+
+ public String getOutputName() {
+ return outputName;
+ }
+
+ public void setOutputName(String outputName) {
+ this.outputName = outputName;
+ }
}