You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/08/13 01:56:46 UTC
[2/3] TEZ-1194. Make TezUserPayload user facing for payload
specification (Tsuyoshi Ozawa and bikas)
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 6aa5b99..e10276c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -303,8 +304,8 @@ public class TestHistoryEventsProtoConversion {
Map<String,EdgeManagerPluginDescriptor> sourceEdgeManagers
= new LinkedHashMap<String, EdgeManagerPluginDescriptor>();
sourceEdgeManagers.put("foo", new EdgeManagerPluginDescriptor("bar"));
- sourceEdgeManagers.put("foo1", new EdgeManagerPluginDescriptor("bar1").setUserPayload(
- new String("payload").getBytes()));
+ sourceEdgeManagers.put("foo1", new EdgeManagerPluginDescriptor("bar1")
+ .setUserPayload(new UserPayload(new String("payload").getBytes(), 100)));
VertexParallelismUpdatedEvent event =
new VertexParallelismUpdatedEvent(
TezVertexID.getInstance(
@@ -322,12 +323,14 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getSourceEdgeManagers().size());
Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(),
deserializedEvent.getSourceEdgeManagers().get("foo").getClassName());
- Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo").getUserPayload(),
- deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
+ Assert.assertNotNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
+ Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload().getPayload());
Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(),
deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName());
- Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload(),
- deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload());
+ Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion(),
+ deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion());
+ Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload().getPayload(),
+ deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload().getPayload());
Assert.assertEquals(event.getVertexLocationHint(),
deserializedEvent.getVertexLocationHint());
logEvents(event, deserializedEvent);
@@ -537,8 +540,8 @@ public class TestHistoryEventsProtoConversion {
// Expected
}
List<TezEvent> events =
- Arrays.asList(new TezEvent(new DataMovementEvent(1, null), new EventMetaData(
- EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
+ Arrays.asList(new TezEvent(new DataMovementEvent(1, null),
+ new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
event = new VertexDataMovementEventsGeneratedEvent(
TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
index f4723d2..fde6db3 100644
--- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -395,7 +396,7 @@ public class TestTaskExecution {
@Override
public void initialize() throws Exception {
- parseConf(getContext().getUserPayload());
+ parseConf(getContext().getUserPayload().getPayload());
}
private void parseConf(byte[] bytes) {
@@ -692,7 +693,7 @@ public class TestTaskExecution {
TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
ProcessorDescriptor processorDescriptor = new ProcessorDescriptor(processorClass)
- .setUserPayload(processorConf);
+ .setUserPayload(new UserPayload(processorConf));
TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", processorDescriptor,
new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index e34001e..e445359 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.DAGClient;
@@ -69,7 +70,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
.next();
KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
- byte[] userPayload = getContext().getUserPayload();
+ byte[] userPayload = getContext().getUserPayload().getPayload();
if (userPayload != null) {
boolean doLocalityCheck = userPayload[0] > 0 ? true : false;
if (doLocalityCheck) {
@@ -101,8 +102,8 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
while (inputKvReader.next()) {
sum += ((IntWritable) inputKvReader.getCurrentValue()).get();
}
- boolean doLocalityCheck = getContext().getUserPayload()[0] > 0 ? true : false;
- int broadcastSum = getContext().getUserPayload()[1];
+ boolean doLocalityCheck = getContext().getUserPayload().getPayload()[0] > 0 ? true : false;
+ int broadcastSum = getContext().getUserPayload().getPayload()[1];
int expectedSum = broadcastSum + getContext().getTaskIndex();
System.out.println("Index: " + getContext().getTaskIndex() +
" sum: " + sum + " expectedSum: " + expectedSum + " broadcastSum: " + broadcastSum);
@@ -139,7 +140,8 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
numOneToOneTasks = 1;
}
}
- byte[] procPayload = {(byte) (doLocalityCheck ? 1 : 0), 1};
+ byte[] procByte = {(byte) (doLocalityCheck ? 1 : 0), 1};
+ UserPayload procPayload = new UserPayload(procByte);
System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 50db89a..3a081bb 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -62,6 +62,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
@@ -166,7 +167,7 @@ public class FilterLinesByWord extends Configured implements Tool {
stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
stage2Conf.setBoolean("mapred.mapper.new-api", false);
- byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+ UserPayload stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
// Setup stage1 Vertex
Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
@@ -186,8 +187,8 @@ public class FilterLinesByWord extends Configured implements Tool {
// Setup stage2 Vertex
Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
- FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
- .createUserPayloadFromConf(stage2Conf)), 1);
+ FilterByWordOutputProcessor.class.getName()).setUserPayload(
+ MRHelpers.createUserPayloadFromConf(stage2Conf)), 1);
stage2Vertex.setTaskLocalFiles(commonLocalResources);
// Configure the Output for stage2
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 0040f02..a77713f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -55,6 +55,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
@@ -157,7 +158,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
stage2Conf.setBoolean("mapred.mapper.new-api", false);
- byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+ UserPayload stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
// Setup stage1 Vertex
Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index a6a5dbd..fda61b0 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -43,6 +43,7 @@ import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
@@ -199,8 +200,8 @@ public class IntersectDataGen extends Configured implements Tool {
DAG dag = new DAG("IntersectDataGen");
Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
- GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
- largeOutSizePerTask, smallOutSizePerTask)), numTasks);
+ GenDataProcessor.class.getName()).setUserPayload(
+ new UserPayload(GenDataProcessor.createConfiguration(largeOutSizePerTask, smallOutSizePerTask))), numTasks);
genDataVertex.addDataSink(STREAM_OUTPUT_NAME,
MROutput.createConfigurer(new Configuration(tezConf),
TextOutputFormat.class, largeOutPath.toUri().toString()).create());
@@ -241,7 +242,7 @@ public class IntersectDataGen extends Configured implements Tool {
@Override
public void initialize() throws Exception {
- byte[] payload = getContext().getUserPayload();
+ byte[] payload = getContext().getUserPayload().getPayload();
ByteArrayInputStream bis = new ByteArrayInputStream(payload);
DataInputStream dis = new DataInputStream(bis);
streamOutputFileSize = dis.readLong();
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 4380840..0562633 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import com.google.common.collect.Maps;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -69,6 +70,7 @@ import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
@@ -82,6 +84,7 @@ import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
/**
@@ -517,7 +520,7 @@ public class MRRSleepJob extends Configured implements Tool {
List<Vertex> vertices = new ArrayList<Vertex>();
- byte[] mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+ UserPayload mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
int numTasks = generateSplitsInAM ? -1 : numMapper;
Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
@@ -531,7 +534,7 @@ public class MRRSleepJob extends Configured implements Tool {
for (int i = 0; i < iReduceStagesCount; ++i) {
Configuration iconf =
intermediateReduceStageConfs[i];
- byte[] iReduceUserPayload = MRHelpers.createUserPayloadFromConf(iconf);
+ UserPayload iReduceUserPayload = MRHelpers.createUserPayloadFromConf(iconf);
Vertex ivertex = new Vertex("ireduce" + (i+1),
new ProcessorDescriptor(ReduceProcessor.class.getName()).
setUserPayload(iReduceUserPayload), numIReducer);
@@ -542,7 +545,7 @@ public class MRRSleepJob extends Configured implements Tool {
Vertex finalReduceVertex = null;
if (numReducer > 0) {
- byte[] reducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+ UserPayload reducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
finalReduceVertex.setTaskLocalFiles(commonLocalResources);
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 5578665..255dcbd 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -61,6 +61,7 @@ import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
@@ -184,7 +185,6 @@ public class TestOrderedWordCount extends Configured implements Tool {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
mapStageConf.writeXml(outputStream);
String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
-
DataSourceDescriptor dsd;
if (generateSplitsInClient) {
mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
@@ -215,7 +215,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
finalReduceConf.writeXml(finalReduceOutputStream);
String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
- byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+ UserPayload finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
Vertex finalReduceVertex = new Vertex("finalreduce",
new ProcessorDescriptor(
ReduceProcessor.class.getName())
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index aa7b836..f2b7043 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.examples.FilterLinesByWord;
import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 8e5702f..2a0a1e0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -30,6 +30,7 @@ import java.util.Map.Entry;
import java.util.TreeMap;
import com.google.common.collect.Maps;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -90,6 +91,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -411,9 +413,8 @@ public class YARNRunner implements ClientProtocol {
stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, "part");
- byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
- Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
- setUserPayload(vertexUserPayload),
+ UserPayload vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
+ Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).setUserPayload(vertexUserPayload),
numTasks, taskResource);
if (isMap) {
vertex.addDataSource("MRInput",
@@ -807,7 +808,7 @@ public class YARNRunner implements ClientProtocol {
private static class MRInputHelpersInternal extends MRInputHelpers {
- protected static byte[] createMRInputPayload(Configuration conf,
+ protected static UserPayload createMRInputPayload(Configuration conf,
MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws
IOException {
return MRInputHelpers.createMRInputPayload(conf, mrSplitsProto);
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 1f6c8bd..021c82d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -59,14 +60,14 @@ public class MROutputCommitter extends OutputCommitter {
@Override
public void initialize() throws IOException {
- byte[] userPayload = getContext().getOutputUserPayload();
- if (userPayload == null) {
+ UserPayload userPayload = getContext().getOutputUserPayload();
+ if (!userPayload.hasPayload()) {
jobConf = new JobConf();
} else {
jobConf = new JobConf(
- MRHelpers.createConfFromUserPayload(getContext().getOutputUserPayload()));
+ MRHelpers.createConfFromUserPayload(userPayload));
}
-
+
// Read all credentials into the credentials instance stored in JobConf.
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
@@ -113,12 +114,6 @@ public class MROutputCommitter extends OutputCommitter {
LOG.info("Using mapred newApiCommitter.");
}
- LOG.info("OutputCommitter set in config for outputName="
- + context.getOutputName()
- + ", vertexName=" + context.getVertexName()
- + ", outputCommitterClass="
- + jobConf.get("mapred.output.committer.class"));
-
if (newApiCommitter) {
TaskAttemptID taskAttemptID = new TaskAttemptID(
Long.toString(context.getApplicationId().getClusterTimestamp()),
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index df942cc..c03d4bb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -22,12 +22,14 @@ import java.util.List;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 888ed1c..c100177 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezYARNUtils;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.combine.MRCombiner;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -338,7 +339,7 @@ public class MRHelpers {
@LimitedPrivate("Hive, Pig")
@Unstable
- public static byte[] createUserPayloadFromConf(Configuration conf)
+ public static UserPayload createUserPayloadFromConf(Configuration conf)
throws IOException {
return TezUtils.createUserPayloadFromConf(conf);
}
@@ -351,9 +352,9 @@ public class MRHelpers {
@LimitedPrivate("Hive, Pig")
@Unstable
- public static Configuration createConfFromUserPayload(byte[] bb)
+ public static Configuration createConfFromUserPayload(UserPayload payload)
throws IOException {
- return TezUtils.createConfFromUserPayload(bb);
+ return TezUtils.createConfFromUserPayload(payload);
}
@LimitedPrivate("Hive, Pig")
@@ -362,7 +363,6 @@ public class MRHelpers {
return TezUtils.createConfFromByteString(bs);
}
-
/**
* Extract the map task's container resource requirements from the
* provided configuration.
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 6e411b3..1a4af9c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -34,6 +34,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -61,6 +62,7 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
@@ -132,9 +134,9 @@ public class MRInputHelpers {
* @throws IOException
*/
@InterfaceStability.Evolving
- public static MRRuntimeProtos.MRInputUserPayloadProto parseMRInputPayload(byte[] bytes)
+ public static MRRuntimeProtos.MRInputUserPayloadProto parseMRInputPayload(UserPayload payload)
throws IOException {
- return MRRuntimeProtos.MRInputUserPayloadProto.parseFrom(bytes);
+ return MRRuntimeProtos.MRInputUserPayloadProto.parseFrom(payload.getPayload());
}
/**
@@ -147,6 +149,7 @@ public class MRInputHelpers {
* @return an instance of the split
* @throws java.io.IOException
*/
+ @SuppressWarnings("unchecked")
@InterfaceStability.Evolving
public static InputSplit createOldFormatSplitFromUserPayload(
MRRuntimeProtos.MRSplitProto splitProto, SerializationFactory serializationFactory)
@@ -657,7 +660,7 @@ public class MRInputHelpers {
* or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat}
*/
@InterfaceAudience.Private
- protected static byte[] createMRInputPayloadWithGrouping(Configuration conf) throws IOException {
+ protected static UserPayload createMRInputPayloadWithGrouping(Configuration conf) throws IOException {
Preconditions
.checkArgument(conf != null, "Configuration must be specified");
return createMRInputPayload(TezUtils.createByteStringFromConf(conf),
@@ -665,7 +668,7 @@ public class MRInputHelpers {
}
@InterfaceAudience.Private
- protected static byte[] createMRInputPayload(Configuration conf,
+ protected static UserPayload createMRInputPayload(Configuration conf,
MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws
IOException {
Preconditions
@@ -675,7 +678,7 @@ public class MRInputHelpers {
mrSplitsProto, false);
}
- private static byte[] createMRInputPayload(ByteString bytes,
+ private static UserPayload createMRInputPayload(ByteString bytes,
MRRuntimeProtos.MRSplitsProto mrSplitsProto,
boolean isGrouped) throws IOException {
MRRuntimeProtos.MRInputUserPayloadProto.Builder userPayloadBuilder =
@@ -688,7 +691,7 @@ public class MRInputHelpers {
userPayloadBuilder.setGroupingEnabled(isGrouped);
// TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer would be
// more efficient.
- return userPayloadBuilder.build().toByteArray();
+ return new UserPayload(userPayloadBuilder.build().toByteArray());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 65b383c..6b8ed83 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -41,6 +41,7 @@ import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
@@ -207,8 +208,7 @@ public class MRInput extends MRInputBase {
}
MRHelpers.translateVertexConfToTez(inputConf);
MRHelpers.doJobClientMagic(inputConf);
- byte[] payload = MRInputHelpersInternal.createMRInputPayload(inputConf,
- inputSplitInfo.getSplitsProto());
+ UserPayload payload = MRInputHelpersInternal.createMRInputPayload(inputConf, inputSplitInfo.getSplitsProto());
Credentials credentials = null;
if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
credentials = inputSplitInfo.getCredentials();
@@ -229,7 +229,7 @@ public class MRInput extends MRInputBase {
Credentials credentials = maybeGetCredentials();
- byte[] payload = null;
+ UserPayload payload = null;
if (groupSplitsInAM) {
payload = MRInputHelpersInternal.createMRInputPayloadWithGrouping(inputConf);
} else {
@@ -248,7 +248,7 @@ public class MRInput extends MRInputBase {
Credentials credentials = maybeGetCredentials();
- byte[] payload = null;
+ UserPayload payload = null;
if (groupSplitsInAM) {
payload = MRInputHelpersInternal.createMRInputPayloadWithGrouping(inputConf);
} else {
@@ -512,12 +512,12 @@ public class MRInput extends MRInputBase {
private static class MRInputHelpersInternal extends MRInputHelpers {
- protected static byte[] createMRInputPayloadWithGrouping(Configuration conf) throws
+ protected static UserPayload createMRInputPayloadWithGrouping(Configuration conf) throws
IOException {
return MRInputHelpers.createMRInputPayloadWithGrouping(conf);
}
- protected static byte[] createMRInputPayload(Configuration conf,
+ protected static UserPayload createMRInputPayload(Configuration conf,
MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws
IOException {
return MRInputHelpers.createMRInputPayload(conf, mrSplitsProto);
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
index 026919e..b7687e6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
@@ -19,6 +19,7 @@
package org.apache.tez.mapreduce.input.base;
import com.google.common.base.Preconditions;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
@@ -28,6 +29,7 @@ import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 7410a10..1add54d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -154,7 +155,7 @@ public class MROutput extends AbstractLogicalOutput {
* @return
* @throws IOException
*/
- private byte[] createUserPayload(Configuration conf,
+ private UserPayload createUserPayload(Configuration conf,
String outputFormatName, boolean useNewApi) {
Configuration outputConf = new JobConf(conf);
outputConf.setBoolean("mapred.reducer.new-api", useNewApi);
@@ -235,8 +236,7 @@ public class MROutput extends AbstractLogicalOutput {
taskNumberFormat.setGroupingUsed(false);
nonTaskNumberFormat.setMinimumIntegerDigits(3);
nonTaskNumberFormat.setGroupingUsed(false);
- Configuration conf = TezUtils.createConfFromUserPayload(
- getContext().getUserPayload());
+ Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
this.jobConf = new JobConf(conf);
// Add tokens to the jobConf - in case they are accessed within the RW / OF
jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 4e22ca0..6caed26 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -68,6 +68,7 @@ import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.IDConverter;
@@ -141,7 +142,7 @@ public abstract class MRTask extends AbstractLogicalIOProcessor {
processorContext.getTaskIndex()),
processorContext.getTaskAttemptNumber());
- byte[] userPayload = processorContext.getUserPayload();
+ UserPayload userPayload = processorContext.getUserPayload();
Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
if (conf instanceof JobConf) {
this.jobConf = (JobConf)conf;
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index fc8a72f..d9ff1b1 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -18,6 +18,8 @@
package org.apache.tez.mapreduce.common;
+import org.apache.tez.dag.api.UserPayload;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@@ -66,7 +68,7 @@ public class TestMRInputSplitDistributor {
MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
payloadProto.setSplits(splitsProtoBuilder.build());
payloadProto.setConfigurationBytes(confByteString);
- byte[] userPayload = payloadProto.build().toByteArray();
+ UserPayload userPayload = new UserPayload(payloadProto.build().toByteArray());
InputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
@@ -114,7 +116,7 @@ public class TestMRInputSplitDistributor {
MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
payloadProto.setSplits(splitsProtoBuilder.build());
payloadProto.setConfigurationBytes(confByteString);
- byte[] userPayload = payloadProto.build().toByteArray();
+ UserPayload userPayload = new UserPayload(payloadProto.build().toByteArray());
InputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
@@ -146,11 +148,11 @@ public class TestMRInputSplitDistributor {
InputInitializerContext {
private final ApplicationId appId;
- private final byte[] payload;
+ private final UserPayload payload;
- TezRootInputInitializerContextForTest(byte[] payload) throws IOException {
+ TezRootInputInitializerContextForTest(UserPayload payload) throws IOException {
appId = ApplicationId.newInstance(1000, 200);
- this.payload = payload;
+ this.payload = payload == null ? new UserPayload(null) : payload;
}
@Override
@@ -169,7 +171,7 @@ public class TestMRInputSplitDistributor {
}
@Override
- public byte[] getInputUserPayload() {
+ public UserPayload getInputUserPayload() {
return payload;
}
@@ -204,7 +206,7 @@ public class TestMRInputSplitDistributor {
}
@Override
- public byte[] getUserPayload() {
+ public UserPayload getUserPayload() {
throw new UnsupportedOperationException("getUserPayload not implemented in this mock");
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index e2ace27..321ec75 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
@@ -109,8 +110,7 @@ public class TestMultiMRInput {
assertEquals(1, splits.length);
MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
- InputDataInformationEvent event = new InputDataInformationEvent(0,
- splitProto.toByteArray());
+ InputDataInformationEvent event = new InputDataInformationEvent(0, splitProto.toByteArray());
eventList.clear();
eventList.add(event);
@@ -169,12 +169,10 @@ public class TestMultiMRInput {
assertEquals(2, splits.length);
MRSplitProto splitProto1 = MRInputHelpers.createSplitProto(splits[0]);
- InputDataInformationEvent event1 = new InputDataInformationEvent(0,
- splitProto1.toByteArray());
+ InputDataInformationEvent event1 = new InputDataInformationEvent(0, splitProto1.toByteArray());
MRSplitProto splitProto2 = MRInputHelpers.createSplitProto(splits[1]);
- InputDataInformationEvent event2 = new InputDataInformationEvent(0,
- splitProto2.toByteArray());
+ InputDataInformationEvent event2 = new InputDataInformationEvent(0, splitProto2.toByteArray());
eventList.clear();
eventList.add(event1);
@@ -222,10 +220,8 @@ public class TestMultiMRInput {
assertEquals(1, splits.length);
MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
- InputDataInformationEvent event1 = new InputDataInformationEvent(0,
- splitProto.toByteArray());
- InputDataInformationEvent event2 = new InputDataInformationEvent(1,
- splitProto.toByteArray());
+ InputDataInformationEvent event1 = new InputDataInformationEvent(0, splitProto.toByteArray());
+ InputDataInformationEvent event2 = new InputDataInformationEvent(1, splitProto.toByteArray());
eventList.clear();
eventList.add(event1);
@@ -254,7 +250,7 @@ public class TestMultiMRInput {
doReturn(1).when(inputContext).getTaskIndex();
doReturn(1).when(inputContext).getTaskVertexIndex();
doReturn("taskVertexName").when(inputContext).getTaskVertexName();
- doReturn(payload).when(inputContext).getUserPayload();
+ doReturn(new UserPayload(payload)).when(inputContext).getUserPayload();
return inputContext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 3731c64..79c69ab 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -53,6 +53,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 80a2a30..5727260 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -35,6 +35,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -123,9 +124,9 @@ public class TestMapProcessor {
InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
new InputDescriptor(MRInputLegacy.class.getName())
- .setUserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+ .setUserPayload(new UserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build()
- .toByteArray()),
+ .toByteArray())),
1);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex",
new OutputDescriptor(LocalOnFileSorterOutput.class.getName())
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 89946c6..493ee57 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -43,6 +43,7 @@ import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.TestUmbilical;
import org.apache.tez.mapreduce.TezTestUtils;
import org.apache.tez.mapreduce.hadoop.IDConverter;
@@ -132,9 +133,9 @@ public class TestReduceProcessor {
InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
new InputDescriptor(MRInputLegacy.class.getName())
- .setUserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+ .setUserPayload(new UserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
.setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build()
- .toByteArray()),
+ .toByteArray())),
1);
OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex",
new OutputDescriptor(LocalOnFileSorterOutput.class.getName()).
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index 803d9e2..b682941 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -19,11 +19,14 @@
package org.apache.tez.common;
import com.google.protobuf.ByteString;
+
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.EventProtos;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
public class ProtoConverters {
@@ -64,8 +67,23 @@ public class ProtoConverters {
EventProtos.CompositeEventProto proto) {
return new CompositeDataMovementEvent(proto.getStartIndex(),
proto.getCount(),
- proto.hasUserPayload() ?
- proto.getUserPayload().toByteArray() : null);
+ proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null);
+ }
+
+ public static EventProtos.VertexManagerEventProto convertVertexManagerEventToProto(
+ VertexManagerEvent event) {
+ EventProtos.VertexManagerEventProto.Builder vmBuilder = VertexManagerEventProto.newBuilder();
+ vmBuilder.setTargetVertexName(event.getTargetVertexName());
+ if (event.getUserPayload() != null) {
+ vmBuilder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
+ }
+ return vmBuilder.build();
+ }
+
+ public static VertexManagerEvent convertVertexManagerEventFromProto(
+ EventProtos.VertexManagerEventProto vmProto) {
+ return new VertexManagerEvent(vmProto.getTargetVertexName(),
+ vmProto.hasUserPayload() ? vmProto.getUserPayload().toByteArray() : null);
}
public static EventProtos.RootInputDataInformationEventProto
@@ -84,8 +102,7 @@ public class ProtoConverters {
convertRootInputDataInformationEventFromProto(
EventProtos.RootInputDataInformationEventProto proto) {
InputDataInformationEvent diEvent = new InputDataInformationEvent(
- proto.getSourceIndex(), proto.getUserPayload() != null ? proto.getUserPayload()
- .toByteArray() : null);
+ proto.getSourceIndex(), proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null);
diEvent.setTargetIndex(proto.getTargetIndex());
return diEvent;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index fea1350..70bfad7 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -46,8 +46,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
-import com.google.protobuf.ByteString;
-
public class TezEvent implements Writable {
private EventType eventType;
@@ -139,13 +137,8 @@ public class TezEvent implements Writable {
(CompositeDataMovementEvent) event).toByteArray();
break;
case VERTEX_MANAGER_EVENT:
- VertexManagerEvent vmEvt = (VertexManagerEvent) event;
- VertexManagerEventProto.Builder vmBuilder = VertexManagerEventProto.newBuilder();
- vmBuilder.setTargetVertexName(vmEvt.getTargetVertexName());
- if (vmEvt.getUserPayload() != null) {
- vmBuilder.setUserPayload(ByteString.copyFrom(vmEvt.getUserPayload()));
- }
- eventBytes = vmBuilder.build().toByteArray();
+ eventBytes = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event)
+ .toByteArray();
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
@@ -214,10 +207,8 @@ public class TezEvent implements Writable {
event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);
break;
case VERTEX_MANAGER_EVENT:
- VertexManagerEventProto vmProto =
- VertexManagerEventProto.parseFrom(eventBytes);
- event = new VertexManagerEvent(vmProto.getTargetVertexName(),
- vmProto.getUserPayload() != null ? vmProto.getUserPayload().toByteArray() : null);
+ VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(eventBytes);
+ event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
break;
case INPUT_READ_ERROR_EVENT:
InputReadErrorEventProto ideProto =
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index d39e5b3..4a0b646 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -31,11 +31,10 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUserPayload;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.InputReadyTracker;
import org.apache.tez.runtime.RuntimeTask;
@@ -49,7 +48,7 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
public class TezInputContextImpl extends TezTaskContextImpl
implements InputContext {
- private final TezUserPayload userPayload;
+ private final UserPayload userPayload;
private final String sourceVertexName;
private final EventMetaData sourceInfo;
private final int inputIndex;
@@ -61,7 +60,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
int appAttemptNumber,
TezUmbilical tezUmbilical, String dagName, String taskVertexName,
String sourceVertexName, TezTaskAttemptID taskAttemptID,
- TezCounters counters, int inputIndex, @Nullable byte[] userPayload,
+ TezCounters counters, int inputIndex, @Nullable UserPayload userPayload,
RuntimeTask runtimeTask,
Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
@@ -75,7 +74,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
checkNotNull(sourceVertexName, "sourceVertexName is null");
checkNotNull(inputs, "input map is null");
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
- this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+ this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
this.inputIndex = inputIndex;
this.sourceVertexName = sourceVertexName;
this.sourceInfo = new EventMetaData(
@@ -106,10 +105,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
tezUmbilical.addEvents(tezEvents);
}
- @Nullable
@Override
- public byte[] getUserPayload() {
- return userPayload.getPayload();
+ public UserPayload getUserPayload() {
+ return userPayload;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
index f43282a..f0cecc2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
@@ -5,25 +5,22 @@ import static com.google.common.base.Preconditions.checkNotNull;
import javax.annotation.Nullable;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.tez.common.TezUserPayload;
-import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.InputReadyTracker;
-import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.MergedLogicalInput;
import org.apache.tez.runtime.api.MergedInputContext;
public class TezMergedInputContextImpl implements MergedInputContext {
- private final TezUserPayload userPayload;
+ private final UserPayload userPayload;
private final String groupInputName;
private final Map<String, MergedLogicalInput> groupInputsMap;
private final InputReadyTracker inputReadyTracker;
private final String[] workDirs;
- public TezMergedInputContextImpl(@Nullable byte[] userPayload, String groupInputName,
+ public TezMergedInputContextImpl(@Nullable UserPayload userPayload, String groupInputName,
Map<String, MergedLogicalInput> groupInputsMap,
InputReadyTracker inputReadyTracker, String[] workDirs) {
checkNotNull(groupInputName, "groupInputName is null");
@@ -31,15 +28,14 @@ public class TezMergedInputContextImpl implements MergedInputContext {
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
this.groupInputName = groupInputName;
this.groupInputsMap = groupInputsMap;
- this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+ this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
this.inputReadyTracker = inputReadyTracker;
this.workDirs = workDirs;
}
- @Nullable
@Override
- public byte[] getUserPayload() {
- return userPayload.getPayload();
+ public UserPayload getUserPayload() {
+ return userPayload;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 2d0d606..8f1fe85 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -31,11 +31,10 @@ import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUserPayload;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.Event;
@@ -47,7 +46,7 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
public class TezOutputContextImpl extends TezTaskContextImpl
implements OutputContext {
- private final TezUserPayload userPayload;
+ private final UserPayload userPayload;
private final String destinationVertexName;
private final EventMetaData sourceInfo;
private final int outputIndex;
@@ -58,7 +57,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
String taskVertexName,
String destinationVertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters, int outputIndex,
- @Nullable byte[] userPayload, RuntimeTask runtimeTask,
+ @Nullable UserPayload userPayload, RuntimeTask runtimeTask,
Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry) {
@@ -68,7 +67,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
auxServiceEnv, memDist, outputDescriptor, objectRegistry);
checkNotNull(outputIndex, "outputIndex is null");
checkNotNull(destinationVertexName, "destinationVertexName is null");
- this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+ this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
this.outputIndex = outputIndex;
this.destinationVertexName = destinationVertexName;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
@@ -96,10 +95,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
tezUmbilical.addEvents(tezEvents);
}
- @Nullable
@Override
- public byte[] getUserPayload() {
- return userPayload.getPayload();
+ public UserPayload getUserPayload() {
+ return userPayload;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index a20036e..eefa337 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -31,13 +31,10 @@ import java.util.Map;
import javax.annotation.Nullable;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUserPayload;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.InputReadyTracker;
import org.apache.tez.runtime.RuntimeTask;
@@ -50,16 +47,14 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
public class TezProcessorContextImpl extends TezTaskContextImpl implements ProcessorContext {
- private static final Log LOG = LogFactory.getLog(TezProcessorContextImpl.class);
-
- private final TezUserPayload userPayload;
+ private final UserPayload userPayload;
private final EventMetaData sourceInfo;
private final InputReadyTracker inputReadyTracker;
public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
TezUmbilical tezUmbilical, String dagName, String vertexName,
TezTaskAttemptID taskAttemptID, TezCounters counters,
- @Nullable byte[] userPayload, RuntimeTask runtimeTask,
+ @Nullable UserPayload userPayload, RuntimeTask runtimeTask,
Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry) {
@@ -67,7 +62,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
auxServiceEnv, memDist, processorDescriptor, objectRegistry);
checkNotNull(inputReadyTracker, "inputReadyTracker is null");
- this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+ this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
taskVertexName, "", taskAttemptID);
this.inputReadyTracker = inputReadyTracker;
@@ -84,10 +79,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
tezUmbilical.addEvents(tezEvents);
}
- @Nullable
@Override
- public byte[] getUserPayload() {
- return userPayload.getPayload();
+ public UserPayload getUserPayload() {
+ return userPayload;
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 45aba9c..59db901 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezEntityDescriptor;
+import org.apache.tez.dag.api.EntityDescriptor;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
@@ -58,7 +58,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
private final int appAttemptNumber;
private final Map<String, String> auxServiceEnv;
protected final MemoryDistributor initialMemoryDistributor;
- protected final TezEntityDescriptor<?> descriptor;
+ protected final EntityDescriptor<?> descriptor;
private final String dagName;
private final ObjectRegistry objectRegistry;
@@ -68,7 +68,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
TezCounters counters, RuntimeTask runtimeTask,
TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> auxServiceEnv, MemoryDistributor memDist,
- TezEntityDescriptor<?> descriptor, ObjectRegistry objectRegistry) {
+ EntityDescriptor<?> descriptor, ObjectRegistry objectRegistry) {
checkNotNull(conf, "conf is null");
checkNotNull(dagName, "dagName is null");
checkNotNull(taskVertexName, "taskVertexName is null");
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index 6d58635..042f837 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezEntityDescriptor;
+import org.apache.tez.dag.api.EntityDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.InputContext;
@@ -93,7 +93,7 @@ public class MemoryDistributor {
* Used by the Tez framework to request memory on behalf of user requests.
*/
public void requestMemory(long requestSize, MemoryUpdateCallback callback,
- TaskContext taskContext, TezEntityDescriptor<?> descriptor) {
+ TaskContext taskContext, EntityDescriptor<?> descriptor) {
registerRequest(requestSize, callback, taskContext, descriptor);
}
@@ -154,7 +154,7 @@ public class MemoryDistributor {
}
private long registerRequest(long requestSize, MemoryUpdateCallback callback,
- TaskContext entityContext, TezEntityDescriptor<?> descriptor) {
+ TaskContext entityContext, EntityDescriptor<?> descriptor) {
Preconditions.checkArgument(requestSize >= 0);
Preconditions.checkNotNull(callback);
Preconditions.checkNotNull(entityContext);
@@ -211,7 +211,7 @@ public class MemoryDistributor {
private final InitialMemoryRequestContext requestContext;
public RequestorInfo(TaskContext taskContext, long requestSize,
- final MemoryUpdateCallback callback, TezEntityDescriptor<?> descriptor) {
+ final MemoryUpdateCallback callback, EntityDescriptor<?> descriptor) {
InitialMemoryRequestContext.ComponentType type;
String componentVertexName;
if (taskContext instanceof InputContext) {
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index d171365..27446bb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -36,6 +36,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -432,7 +433,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
remainderRangeForLastShuffler : basePartitionRange));
EdgeManagerPluginDescriptor edgeManagerDescriptor =
new EdgeManagerPluginDescriptor(CustomShuffleEdgeManager.class.getName());
- edgeManagerDescriptor.setUserPayload(edgeManagerConfig.toUserPayload());
+ edgeManagerDescriptor.setUserPayload(new UserPayload(edgeManagerConfig.toUserPayload()));
edgeManagers.put(vertex, edgeManagerDescriptor);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 8e98d1c..4ed05aa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -52,6 +52,7 @@ import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
index bf41a26..81c1185 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.conf;
import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.UserPayload;
@InterfaceAudience.Private
abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
@@ -29,7 +30,7 @@ abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
* Get the payload for the configured Output
* @return output configuration as a byte array
*/
- public abstract byte[] getOutputPayload();
+ public abstract UserPayload getOutputPayload();
/**
* Get the output class name
@@ -41,7 +42,7 @@ abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
* Get the payload for the configured Input
* @return input configuration as a byte array
*/
- public abstract byte[] getInputPayload();
+ public abstract UserPayload getInputPayload();
/**
* Get the input class name
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
index d2cd85f..09ffffb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
@@ -21,6 +21,7 @@
package org.apache.tez.runtime.library.conf;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@@ -172,7 +174,7 @@ public class OnFileSortedOutputConfigurer {
*/
public byte[] toByteArray() {
try {
- return TezUtils.createUserPayloadFromConf(conf);
+ return TezUtils.createUserPayloadFromConf(conf).getPayload();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -181,7 +183,7 @@ public class OnFileSortedOutputConfigurer {
@InterfaceAudience.Private
public void fromByteArray(byte[] payload) {
try {
- this.conf = TezUtils.createConfFromUserPayload(payload);
+ this.conf = TezUtils.createConfFromUserPayload(new UserPayload(payload));
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
index f84315f..0b2ce15 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
@@ -21,6 +21,7 @@
package org.apache.tez.runtime.library.conf;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
@@ -108,7 +110,7 @@ public class OnFileUnorderedKVOutputConfigurer {
*/
public byte[] toByteArray() {
try {
- return TezUtils.createUserPayloadFromConf(conf);
+ return TezUtils.createUserPayloadFromConf(conf).getPayload();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -117,7 +119,7 @@ public class OnFileUnorderedKVOutputConfigurer {
@InterfaceAudience.Private
public void fromByteArray(byte[] payload) {
try {
- this.conf = TezUtils.createConfFromUserPayload(payload);
+ this.conf = TezUtils.createConfFromUserPayload(new UserPayload(payload));
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
index 47f32ee..d86c76b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
@@ -21,6 +21,7 @@
package org.apache.tez.runtime.library.conf;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
@@ -122,7 +124,7 @@ public class OnFileUnorderedPartitionedKVOutputConfigurer {
*/
public byte[] toByteArray() {
try {
- return TezUtils.createUserPayloadFromConf(conf);
+ return TezUtils.createUserPayloadFromConf(conf).getPayload();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -131,7 +133,7 @@ public class OnFileUnorderedPartitionedKVOutputConfigurer {
@InterfaceAudience.Private
public void fromByteArray(byte[] payload) {
try {
- this.conf = TezUtils.createConfFromUserPayload(payload);
+ this.conf = TezUtils.createConfFromUserPayload(new UserPayload(payload));
} catch (IOException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
index 029d640..f0d06af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
@@ -19,9 +19,11 @@
package org.apache.tez.runtime.library.conf;
import javax.annotation.Nullable;
+
import java.util.Map;
import com.google.common.base.Preconditions;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -29,6 +31,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.library.output.OnFileSortedOutput;
/**
@@ -85,8 +88,8 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
}
@Override
- public byte[] getOutputPayload() {
- return outputConf.toByteArray();
+ public UserPayload getOutputPayload() {
+ return new UserPayload(outputConf.toByteArray());
}
@Override
@@ -95,8 +98,8 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
}
@Override
- public byte[] getInputPayload() {
- return inputConf.toByteArray();
+ public UserPayload getInputPayload() {
+ return new UserPayload(inputConf.toByteArray());
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
index 6faa658..3320c87 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
@@ -21,6 +21,7 @@
package org.apache.tez.runtime.library.conf;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.Map;
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.input.ShuffledMergedInput;
@@ -242,7 +244,7 @@ public class ShuffledMergedInputConfigurer {
*/
public byte[] toByteArray() {
try {
- return TezUtils.createUserPayloadFromConf(conf);
+ return TezUtils.createUserPayloadFromConf(conf).getPayload();
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -250,7 +252,7 @@ public class ShuffledMergedInputConfigurer {
public void fromByteArray(byte[] payload) {
try {
- this.conf = TezUtils.createConfFromUserPayload(payload);
+ this.conf = TezUtils.createConfFromUserPayload(new UserPayload(payload));
} catch (IOException e) {
throw new RuntimeException(e);
}