You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/09/27 22:13:28 UTC

svn commit: r1527058 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez: DagUtils.java TezTask.java

Author: gunther
Date: Fri Sep 27 20:13:28 2013
New Revision: 1527058

URL: http://svn.apache.org/r1527058
Log:
HIVE-5386: Update DagUtils/Tez task to reflect tez api changes (Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1527058&r1=1527057&r2=1527058&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Sep 27 20:13:28 2013
@@ -67,14 +67,17 @@ import org.apache.tez.dag.api.InputDescr
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
 
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
@@ -160,7 +163,7 @@ public class DagUtils {
             DataSourceType.PERSISTED,
             SchedulingType.SEQUENTIAL,
             new OutputDescriptor(OnFileSortedOutput.class.getName()),
-            new InputDescriptor(ShuffledMergedInput.class.getName()));
+            new InputDescriptor(ShuffledMergedInputLegacy.class.getName()));
     return new Edge(v, w, edgeProperty);
   }
 
@@ -205,15 +208,19 @@ public class DagUtils {
 
     // finally create the vertex
     Vertex map = null;
+    byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
     if (inputSplitInfo.getNumTasks() != 0) {
       map = new Vertex("Map "+seqNo,
           new ProcessorDescriptor(MapProcessor.class.getName()).
-               setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+               setUserPayload(serializedConf),
           inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf));
       Map<String, String> environment = new HashMap<String, String>();
       MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
       map.setTaskEnvironment(environment);
       map.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+      map.addInput("in_"+seqNo, 
+          new InputDescriptor(MRInputLegacy.class.getName()).
+               setUserPayload(serializedConf));
 
       map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
 
@@ -520,6 +527,7 @@ public class DagUtils {
     conf.set(MRJobConfig.OUTPUT_VALUE_CLASS, BytesWritable.class.getName());
 
     conf.set("mapred.partitioner.class", HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER));
+    conf.set("tez.runtime.partitioner.class", MRPartitioner.class.getName());
 
     return conf;
   }
@@ -561,20 +569,30 @@ public class DagUtils {
    */
   public static Vertex createVertex(JobConf conf, BaseWork work,
       Path scratchDir, int seqNo, LocalResource appJarLr, List<LocalResource> additionalLr,
-      FileSystem fileSystem, Context ctx) throws Exception {
+      FileSystem fileSystem, Context ctx, boolean hasChildren) throws Exception {
 
+    Vertex v = null;
     // simply dispatch the call to the right method for the actual (sub-) type of
     // BaseWork.
     if (work instanceof MapWork) {
-      return createVertex(conf, (MapWork) work, seqNo, appJarLr,
+      v = createVertex(conf, (MapWork) work, seqNo, appJarLr,
           additionalLr, fileSystem, scratchDir, ctx);
     } else if (work instanceof ReduceWork) {
-      return createVertex(conf, (ReduceWork) work, seqNo, appJarLr,
+      v = createVertex(conf, (ReduceWork) work, seqNo, appJarLr,
           additionalLr, fileSystem, scratchDir, ctx);
     } else {
       assert false;
       return null;
     }
+
+    // final vertices need to have at least one output
+    if (!hasChildren) {
+      v.addOutput("out_"+seqNo, 
+          new OutputDescriptor(MROutput.class.getName())
+               .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+    }
+
+    return v;
   }
 
   /**

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1527058&r1=1527057&r2=1527058&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Sep 27 20:13:28 2013
@@ -159,10 +159,12 @@ public class TezTask extends Task<TezWor
     int i = ws.size();
     for (BaseWork w: ws) {
 
+      boolean isFinal = work.getLeaves().contains(w);
+
       // translate work to vertex
       JobConf wxConf = DagUtils.initializeVertexConf(conf, w);
       Vertex wx = DagUtils.createVertex(wxConf, w, tezDir,
-          i--, appJarLr, additionalLr, fs, ctx);
+          i--, appJarLr, additionalLr, fs, ctx, !isFinal);
       dag.addVertex(wx);
       workToVertex.put(w, wx);
       workToConf.put(w, wxConf);