You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/09/27 22:13:28 UTC
svn commit: r1527058 - in
/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez:
DagUtils.java TezTask.java
Author: gunther
Date: Fri Sep 27 20:13:28 2013
New Revision: 1527058
URL: http://svn.apache.org/r1527058
Log:
HIVE-5386: Update DagUtils/Tez task to reflect tez api changes (Gunther Hagleitner)
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1527058&r1=1527057&r2=1527058&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Sep 27 20:13:28 2013
@@ -67,14 +67,17 @@ import org.apache.tez.dag.api.InputDescr
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
+import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.partition.MRPartitioner;
/**
* DagUtils. DagUtils is a collection of helper methods to convert
@@ -160,7 +163,7 @@ public class DagUtils {
DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
new OutputDescriptor(OnFileSortedOutput.class.getName()),
- new InputDescriptor(ShuffledMergedInput.class.getName()));
+ new InputDescriptor(ShuffledMergedInputLegacy.class.getName()));
return new Edge(v, w, edgeProperty);
}
@@ -205,15 +208,19 @@ public class DagUtils {
// finally create the vertex
Vertex map = null;
+ byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
if (inputSplitInfo.getNumTasks() != 0) {
map = new Vertex("Map "+seqNo,
new ProcessorDescriptor(MapProcessor.class.getName()).
- setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+ setUserPayload(serializedConf),
inputSplitInfo.getNumTasks(), MRHelpers.getMapResource(conf));
Map<String, String> environment = new HashMap<String, String>();
MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
map.setTaskEnvironment(environment);
map.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
+ map.addInput("in_"+seqNo,
+ new InputDescriptor(MRInputLegacy.class.getName()).
+ setUserPayload(serializedConf));
map.setTaskLocationsHint(inputSplitInfo.getTaskLocationHints());
@@ -520,6 +527,7 @@ public class DagUtils {
conf.set(MRJobConfig.OUTPUT_VALUE_CLASS, BytesWritable.class.getName());
conf.set("mapred.partitioner.class", HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER));
+ conf.set("tez.runtime.partitioner.class", MRPartitioner.class.getName());
return conf;
}
@@ -561,20 +569,30 @@ public class DagUtils {
*/
public static Vertex createVertex(JobConf conf, BaseWork work,
Path scratchDir, int seqNo, LocalResource appJarLr, List<LocalResource> additionalLr,
- FileSystem fileSystem, Context ctx) throws Exception {
+ FileSystem fileSystem, Context ctx, boolean hasChildren) throws Exception {
+ Vertex v = null;
// simply dispatch the call to the right method for the actual (sub-) type of
// BaseWork.
if (work instanceof MapWork) {
- return createVertex(conf, (MapWork) work, seqNo, appJarLr,
+ v = createVertex(conf, (MapWork) work, seqNo, appJarLr,
additionalLr, fileSystem, scratchDir, ctx);
} else if (work instanceof ReduceWork) {
- return createVertex(conf, (ReduceWork) work, seqNo, appJarLr,
+ v = createVertex(conf, (ReduceWork) work, seqNo, appJarLr,
additionalLr, fileSystem, scratchDir, ctx);
} else {
assert false;
return null;
}
+
+ // final vertices need to have at least one output
+ if (!hasChildren) {
+ v.addOutput("out_"+seqNo,
+ new OutputDescriptor(MROutput.class.getName())
+ .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+ }
+
+ return v;
}
/**
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1527058&r1=1527057&r2=1527058&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Sep 27 20:13:28 2013
@@ -159,10 +159,12 @@ public class TezTask extends Task<TezWor
int i = ws.size();
for (BaseWork w: ws) {
+ boolean isFinal = work.getLeaves().contains(w);
+
// translate work to vertex
JobConf wxConf = DagUtils.initializeVertexConf(conf, w);
Vertex wx = DagUtils.createVertex(wxConf, w, tezDir,
- i--, appJarLr, additionalLr, fs, ctx);
+ i--, appJarLr, additionalLr, fs, ctx, !isFinal);
dag.addVertex(wx);
workToVertex.put(w, wx);
workToConf.put(w, wxConf);