You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/20 22:27:59 UTC

svn commit: r1543957 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql: exec/ exec/tez/ optimizer/ parse/ plan/

Author: gunther
Date: Wed Nov 20 21:27:58 2013
New Revision: 1543957

URL: http://svn.apache.org/r1543957
Log:
HIVE-5861: Fix exception in multi insert statement on Tez (Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Wed Nov 20 21:27:58 2013
@@ -21,13 +21,18 @@ package org.apache.hadoop.hive.ql.exec;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.mapred.OutputCollector;
 
 public class OperatorUtils {
 
+  private static final Log LOG = LogFactory.getLog(OperatorUtils.class);
+
   public static <T> Set<T> findOperators(Operator<?> start, Class<T> clazz) {
     return findOperators(start, clazz, new HashSet<T>());
   }
@@ -63,11 +68,29 @@ public class OperatorUtils {
       return;
     }
     for (Operator<? extends OperatorDesc> op : childOperators) {
-      if(op.getName().equals(ReduceSinkOperator.getOperatorName())) { //TODO:
+      if(op.getName().equals(ReduceSinkOperator.getOperatorName())) {
         ((ReduceSinkOperator)op).setOutputCollector(out);
       } else {
         setChildrenCollector(op.getChildOperators(), out);
       }
     }
   }
+
+  public static void setChildrenCollector(List<Operator<? extends OperatorDesc>> childOperators, Map<String, OutputCollector> outMap) {
+    if (childOperators == null) {
+      return;
+    }
+    for (Operator<? extends OperatorDesc> op : childOperators) {
+      if(op.getName().equals(ReduceSinkOperator.getOperatorName())) {
+        ReduceSinkOperator rs = ((ReduceSinkOperator)op);
+        if (outMap.containsKey(rs.getConf().getOutputName())) {
+          LOG.info("Setting output collector: " + rs + " --> " 
+            + rs.getConf().getOutputName());
+          rs.setOutputCollector(outMap.get(rs.getConf().getOutputName()));
+        }
+      } else {
+        setChildrenCollector(op.getChildOperators(), outMap);
+      }
+    }
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Nov 20 21:27:58 2013
@@ -64,9 +64,9 @@ public class MapRecordProcessor  extends
 
   @Override
   void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
-      OutputCollector out){
+      Map<String, OutputCollector> outMap){
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
-    super.init(jconf, mrReporter, inputs, out);
+    super.init(jconf, mrReporter, inputs, outMap);
 
     //Update JobConf using MRInput, info like filename comes via this
     MRInputLegacy mrInput = getMRInput(inputs);
@@ -124,7 +124,7 @@ public class MapRecordProcessor  extends
         }
       }
 
-      OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), out);
+      OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), outMap);
       mapOp.setReporter(reporter);
       MapredContext.get().setReporter(reporter);
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Wed Nov 20 21:27:58 2013
@@ -39,7 +39,7 @@ public abstract class RecordProcessor  {
 
   protected JobConf jconf;
   protected Map<String, LogicalInput> inputs;
-  protected OutputCollector out;
+  protected Map<String, OutputCollector> outMap;
 
   public static final Log l4j = LogFactory.getLog(RecordProcessor.class);
 
@@ -63,11 +63,11 @@ public abstract class RecordProcessor  {
    * @param out
    */
   void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
-      OutputCollector out){
+      Map<String, OutputCollector> outMap){
     this.jconf = jconf;
     this.reporter = mrReporter;
     this.inputs = inputs;
-    this.out = out;
+    this.outMap = outMap;
 
     // Allocate the bean at the beginning -
     memoryMXBean = ManagementFactory.getMemoryMXBean();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Wed Nov 20 21:27:58 2013
@@ -90,9 +90,9 @@ public class ReduceRecordProcessor  exte
 
   @Override
   void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
-      OutputCollector out){
+      Map<String, OutputCollector> outMap){
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
-    super.init(jconf, mrReporter, inputs, out);
+    super.init(jconf, mrReporter, inputs, outMap);
 
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
 
@@ -162,8 +162,8 @@ public class ReduceRecordProcessor  exte
       if (dummyOps != null) {
         children.addAll(dummyOps);
       }
-      OperatorUtils.setChildrenCollector(children, out);
-      
+      OperatorUtils.setChildrenCollector(children, outMap);
+
       reducer.setReporter(reporter);
       MapredContext.get().setReporter(reporter);
 
@@ -329,10 +329,6 @@ public class ReduceRecordProcessor  exte
     if (!abort) {
       abort = execContext.getIoCxt().getIOExceptions();
     }
-    // No row was processed
-    if (out == null) {
-      l4j.trace("Close called no row");
-    }
 
     try {
       if (groupKey != null) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Wed Nov 20 21:27:58 2013
@@ -17,12 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.KVOutputCollector;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -89,17 +91,14 @@ public class TezProcessor implements Log
 
     LOG.info("Running map: " + processorContext.getUniqueIdentifier());
 
-    if(outputs.size() > 1) {
-          throw new IOException("Cannot handle more than one output"
-          + ", outputCount=" + outputs.size());
-    }
-
-    LogicalOutput out = outputs.values().iterator().next();
-
+    Map<String, OutputCollector> outMap = new HashMap<String, OutputCollector>();
 
-
-    KeyValueWriter kvWriter = (KeyValueWriter)out.getWriter();
-    OutputCollector collector = new KVOutputCollector(kvWriter);
+    for (String outputName: outputs.keySet()) {
+      LOG.info("Handling output: " + outputName);
+      KeyValueWriter kvWriter = (KeyValueWriter) outputs.get(outputName).getWriter();
+      OutputCollector collector = new KVOutputCollector(kvWriter);
+      outMap.put(outputName, collector);
+    }
 
     if(isMap){
       rproc = new MapRecordProcessor();
@@ -109,7 +108,7 @@ public class TezProcessor implements Log
     }
 
     MRTaskReporter mrReporter = new MRTaskReporter(processorContext);
-    rproc.init(jobConf, mrReporter, inputs, collector);
+    rproc.init(jobConf, mrReporter, inputs, outMap);
     rproc.run();
 
     //done - output does not need to be committed as hive does not use outputcommitter

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Wed Nov 20 21:27:58 2013
@@ -88,6 +88,10 @@ public class ReduceSinkMapJoinProc imple
           // link the work with the work associated with the reduce sink that triggered this rule
           TezWork tezWork = context.currentTask.getWork();
           tezWork.connect(parentWork, myWork, EdgeType.BROADCAST_EDGE);
+
+          // remember the output name of the reduce sink
+          parentRS.getConf().setOutputName(myWork.getName());
+
         } else {
           List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(childOp);
           if (linkWorkList == null) {
@@ -95,6 +99,14 @@ public class ReduceSinkMapJoinProc imple
           }
           linkWorkList.add(parentWork);
           context.linkOpWithWorkMap.put(childOp, linkWorkList);
+
+          List<ReduceSinkOperator> reduceSinks 
+            = context.linkWorkWithReduceSinkMap.get(parentWork);
+          if (reduceSinks == null) {
+            reduceSinks = new ArrayList<ReduceSinkOperator>();
+          }
+          reduceSinks.add(parentRS);
+          context.linkWorkWithReduceSinkMap.put(parentWork, reduceSinks);
         }
 
         break;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Wed Nov 20 21:27:58 2013
@@ -24,10 +24,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Stack;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -89,6 +91,10 @@ public class GenTezProcContext implement
   // traversing an operator tree
   public final Map<Operator<?>, List<BaseWork>> linkOpWithWorkMap;
 
+  // a map to keep track of what reduce sinks have to be hooked up to
+  // map join work
+  public final Map<BaseWork, List<ReduceSinkOperator>> linkWorkWithReduceSinkMap;
+
   // a map that maintains operator (file-sink or reduce-sink) to work mapping
   public final Map<Operator<?>, BaseWork> operatorWorkMap;
 
@@ -102,6 +108,15 @@ public class GenTezProcContext implement
   // used to group dependent tasks for multi table inserts
   public final DependencyCollectionTask dependencyTask;
 
+  // root of last multi child operator encountered
+  public Stack<Operator<?>> lastRootOfMultiChildOperator;
+
+  // branches of current multi-child operator
+  public Stack<Integer> currentBranchCount;
+
+  // work generated for last multi-child operator
+  public Stack<BaseWork> lastWorkForMultiChildOperator;
+
   @SuppressWarnings("unchecked")
   public GenTezProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -117,10 +132,14 @@ public class GenTezProcContext implement
     this.leafOperatorToFollowingWork = new HashMap<Operator<?>, BaseWork>();
     this.rootOperators = rootOperators;
     this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
+    this.linkWorkWithReduceSinkMap = new HashMap<BaseWork, List<ReduceSinkOperator>>();
     this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
     this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>();
     this.linkChildOpWithDummyOp = new HashMap<Operator<?>, List<Operator<?>>>();
     this.dependencyTask = (DependencyCollectionTask)
         TaskFactory.get(new DependencyCollectionWork(), conf);
+    this.lastRootOfMultiChildOperator = new Stack<Operator<?>>();
+    this.currentBranchCount = new Stack<Integer>();
+    this.lastWorkForMultiChildOperator = new Stack<BaseWork>();
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Wed Nov 20 21:27:58 2013
@@ -71,17 +71,18 @@ public class GenTezWork implements NodeP
     // packing into a vertex, typically a table scan, union or join
     Operator<?> root = context.currentRootOperator;
     if (root == null) {
-      // if there are no more rootOperators we're dealing with multiple
-      // file sinks off of the same table scan. Bail.
-      if (context.rootOperators.isEmpty()) {
-        return null;
-      }
-
       // null means that we're starting with a new table scan
       // the graph walker walks the rootOperators in the same
       // order so we can just take the next
       context.preceedingWork = null;
-      root = context.rootOperators.pop();
+
+      // if there are branches remaining we can't pop the next
+      // root operator yet.
+      if (context.currentBranchCount.isEmpty()
+          || (!context.lastWorkForMultiChildOperator.isEmpty()
+              && context.lastWorkForMultiChildOperator.peek() == null)) {
+        root = context.rootOperators.pop();
+      }
     }
 
     LOG.debug("Root operator: " + root);
@@ -93,18 +94,51 @@ public class GenTezWork implements NodeP
     // a reduce vertex
     BaseWork work;
     if (context.preceedingWork == null) {
-      assert root.getParentOperators().isEmpty();
-      MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
-      LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
-
-      // map work starts with table scan operators
-      assert root instanceof TableScanOperator;
-      String alias = ((TableScanOperator)root).getConf().getAlias();
-
-      GenMapRedUtils.setMapWork(mapWork, context.parseContext,
-          context.inputs, null, root, alias, context.conf, false);
-      tezWork.add(mapWork);
-      work = mapWork;
+      if (root == null) {
+        // this is the multi-insert case. we need to reuse the last
+        // table scan work.
+        root = context.lastRootOfMultiChildOperator.peek();
+        work = context.lastWorkForMultiChildOperator.peek();
+        LOG.debug("Visiting additional branch in: "+root);
+
+      } else {
+        assert root.getParentOperators().isEmpty();
+        MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
+        LOG.debug("Adding map work (" + mapWork.getName() + ") for " + root);
+
+        // map work starts with table scan operators
+        assert root instanceof TableScanOperator;
+        String alias = ((TableScanOperator)root).getConf().getAlias();
+
+        GenMapRedUtils.setMapWork(mapWork, context.parseContext,
+            context.inputs, null, root, alias, context.conf, false);
+        tezWork.add(mapWork);
+        work = mapWork;
+
+        // remember this table scan and work item. this is needed for multiple
+        // insert statements where multiple operator pipelines hang of a single
+        // table scan
+        if (!context.lastWorkForMultiChildOperator.isEmpty()
+            && context.lastWorkForMultiChildOperator.peek() == null) {
+          LOG.debug("Capturing current work for 'multiple branches' case");
+          context.lastWorkForMultiChildOperator.pop();
+          context.lastWorkForMultiChildOperator.push(work);
+        }
+      }
+
+      if (!context.currentBranchCount.isEmpty()) {
+        // we've handled one branch. Adjust the counts.
+        int branches = context.currentBranchCount.pop();
+        if (--branches != 0) {
+          LOG.debug("Remaining branches: "+branches);
+          context.currentBranchCount.push(branches);
+        } else {
+          LOG.debug("No more remaining branches.");
+          context.lastRootOfMultiChildOperator.pop();
+          context.lastWorkForMultiChildOperator.pop();
+        }
+      }
+
     } else {
       assert !root.getParentOperators().isEmpty();
       ReduceWork reduceWork = new ReduceWork("Reducer "+ (++sequenceNumber));
@@ -119,6 +153,8 @@ public class GenTezWork implements NodeP
       assert context.parentOfRoot instanceof ReduceSinkOperator;
       ReduceSinkOperator reduceSink = (ReduceSinkOperator) context.parentOfRoot;
 
+      LOG.debug("Setting up reduce sink: " + reduceSink);
+
       reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
 
       // need to fill in information about the key and value in the reducer
@@ -128,12 +164,25 @@ public class GenTezWork implements NodeP
       reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
            context.preceedingWork.getName());
 
+      // remember the output name of the reduce sink
+      reduceSink.getConf().setOutputName(reduceWork.getName());
+
       tezWork.add(reduceWork);
       tezWork.connect(
           context.preceedingWork,
           reduceWork, EdgeType.SIMPLE_EDGE);
 
       work = reduceWork;
+
+      // remember this work item. this is needed for multiple
+      // insert statements where multiple operator pipelines hang of a forward
+      // operator
+      if (!context.lastWorkForMultiChildOperator.isEmpty()
+          && context.lastWorkForMultiChildOperator.peek() == null) {
+        LOG.debug("Capturing current work for 'multiple branches' case");
+        context.lastWorkForMultiChildOperator.pop();
+        context.lastWorkForMultiChildOperator.push(work);
+      }
     }
 
     // We're scanning the operator from table scan to final file sink.
@@ -162,9 +211,11 @@ public class GenTezWork implements NodeP
       // remember which parent belongs to which tag
       rWork.getTagToInput().put(rs.getConf().getTag(), work.getName());
 
+      // remember the output name of the reduce sink
+      rs.getConf().setOutputName(rWork.getName());
+
       // add dependency between the two work items
-      tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator),
-         EdgeType.SIMPLE_EDGE);
+      tezWork.connect(work, rWork, EdgeType.SIMPLE_EDGE);
     }
 
     // This is where we cut the tree as described above. We also remember that
@@ -183,6 +234,7 @@ public class GenTezWork implements NodeP
       context.currentRootOperator = operator.getChildOperators().get(0);
       context.preceedingWork = work;
     } else {
+      LOG.debug("Leaf operator - resetting context: " + context.currentRootOperator);
       context.parentOfRoot = null;
       context.currentRootOperator = null;
       context.preceedingWork = null;
@@ -214,6 +266,13 @@ public class GenTezWork implements NodeP
       }
       for (BaseWork parentWork : linkWorkList) {
         tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
+        
+        // need to set up output name for reduce sink not that we know the name
+        // of the downstream work
+        for (ReduceSinkOperator r: 
+               context.linkWorkWithReduceSinkMap.get(parentWork)) {
+          r.getConf().setOutputName(work.getName());
+        }
       }
     }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Wed Nov 20 21:27:58 2013
@@ -34,10 +34,12 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -153,6 +155,40 @@ public class TezCompiler extends TaskCom
       }
     });
 
+    opRules.put(new RuleRegExp("Setup table scan",
+        TableScanOperator.getOperatorName() + "%"), new NodeProcessor()
+    {
+      @Override
+      public Object process(Node n, Stack<Node> s,
+          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
+        GenTezProcContext context = (GenTezProcContext) procCtx;
+        TableScanOperator tableScan = (TableScanOperator) n;
+        LOG.debug("TableScan operator ("+tableScan
+            +"). Number of branches: "+tableScan.getNumChild());
+        context.lastRootOfMultiChildOperator.push(tableScan);
+        context.currentBranchCount.push(tableScan.getNumChild());
+        context.lastWorkForMultiChildOperator.push(null);
+        return null;
+      }
+    });
+
+    opRules.put(new RuleRegExp("Handle Forward opertor",
+        ForwardOperator.getOperatorName() + "%"), new NodeProcessor()
+    {
+      @Override
+      public Object process(Node n, Stack<Node> s,
+          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
+        GenTezProcContext context = (GenTezProcContext) procCtx;
+        ForwardOperator forward = (ForwardOperator) n;
+        LOG.debug("Forward operator ("+forward+
+            "). Number of branches: "+forward.getNumChild());
+        context.lastRootOfMultiChildOperator.push(context.currentRootOperator);
+        context.currentBranchCount.push(forward.getNumChild());
+        context.lastWorkForMultiChildOperator.push(null);
+        return null;
+      }
+    });
+
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1543957&r1=1543956&r2=1543957&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Wed Nov 20 21:27:58 2013
@@ -60,6 +60,12 @@ public class ReduceSinkDesc extends Abst
   private int numDistributionKeys;
 
   /**
+   * Used in tez. Holds the name of the output
+   * that this reduce sink is writing to.
+   */
+  private String outputName;
+
+  /**
    * The partition columns (CLUSTER BY or DISTRIBUTE BY in Hive language).
    * Partition columns decide the reducer that the current row goes to.
    * Partition columns are not passed to reducer.
@@ -273,4 +279,12 @@ public class ReduceSinkDesc extends Abst
       List<List<Integer>> distinctColumnIndices) {
     this.distinctColumnIndices = distinctColumnIndices;
   }
+
+  public String getOutputName() {
+    return outputName;
+  }
+
+  public void setOutputName(String outputName) {
+    this.outputName = outputName;
+  }
 }