You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/08/13 01:56:46 UTC

[2/3] TEZ-1194. Make TezUserPayload user facing for payload specification (Tsuyoshi Ozawa and bikas)

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 6aa5b99..e10276c 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -303,8 +304,8 @@ public class TestHistoryEventsProtoConversion {
       Map<String,EdgeManagerPluginDescriptor> sourceEdgeManagers
           = new LinkedHashMap<String, EdgeManagerPluginDescriptor>();
       sourceEdgeManagers.put("foo", new EdgeManagerPluginDescriptor("bar"));
-      sourceEdgeManagers.put("foo1", new EdgeManagerPluginDescriptor("bar1").setUserPayload(
-          new String("payload").getBytes()));
+      sourceEdgeManagers.put("foo1", new EdgeManagerPluginDescriptor("bar1")
+          .setUserPayload(new UserPayload(new String("payload").getBytes(), 100)));
       VertexParallelismUpdatedEvent event =
           new VertexParallelismUpdatedEvent(
               TezVertexID.getInstance(
@@ -322,12 +323,14 @@ public class TestHistoryEventsProtoConversion {
           deserializedEvent.getSourceEdgeManagers().size());
       Assert.assertEquals(event.getSourceEdgeManagers().get("foo").getClassName(),
           deserializedEvent.getSourceEdgeManagers().get("foo").getClassName());
-      Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo").getUserPayload(),
-          deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
+      Assert.assertNotNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload());
+      Assert.assertNull(deserializedEvent.getSourceEdgeManagers().get("foo").getUserPayload().getPayload());
       Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getClassName(),
           deserializedEvent.getSourceEdgeManagers().get("foo1").getClassName());
-      Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload(),
-          deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload());
+      Assert.assertEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion(),
+          deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload().getVersion());
+      Assert.assertArrayEquals(event.getSourceEdgeManagers().get("foo1").getUserPayload().getPayload(),
+          deserializedEvent.getSourceEdgeManagers().get("foo1").getUserPayload().getPayload());
       Assert.assertEquals(event.getVertexLocationHint(),
           deserializedEvent.getVertexLocationHint());
       logEvents(event, deserializedEvent);
@@ -537,8 +540,8 @@ public class TestHistoryEventsProtoConversion {
       // Expected
     }
     List<TezEvent> events =
-        Arrays.asList(new TezEvent(new DataMovementEvent(1, null), new EventMetaData(
-            EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
+        Arrays.asList(new TezEvent(new DataMovementEvent(1, null),
+            new EventMetaData(EventProducerConsumerType.SYSTEM, "foo", "bar", null)));
     event = new VertexDataMovementEventsGeneratedEvent(
             TezVertexID.getInstance(
                 TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 1), events);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
index f4723d2..fde6db3 100644
--- a/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
+++ b/tez-dag/src/test/java/org/apache/tez/runtime/task/TestTaskExecution.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
@@ -395,7 +396,7 @@ public class TestTaskExecution {
 
     @Override
     public void initialize() throws Exception {
-      parseConf(getContext().getUserPayload());
+      parseConf(getContext().getUserPayload().getPayload());
     }
 
     private void parseConf(byte[] bytes) {
@@ -692,7 +693,7 @@ public class TestTaskExecution {
     TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
     TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
     ProcessorDescriptor processorDescriptor = new ProcessorDescriptor(processorClass)
-        .setUserPayload(processorConf);
+        .setUserPayload(new UserPayload(processorConf));
     TaskSpec taskSpec = new TaskSpec(taskAttemptId, "dagName", "vertexName", processorDescriptor,
         new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
index e34001e..e445359 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java
@@ -39,6 +39,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -69,7 +70,7 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
           .next();
       KeyValueWriter kvWriter = (KeyValueWriter) output.getWriter();
       kvWriter.write(word, new IntWritable(getContext().getTaskIndex()));
-      byte[] userPayload = getContext().getUserPayload();
+      byte[] userPayload = getContext().getUserPayload().getPayload();
       if (userPayload != null) {
         boolean doLocalityCheck = userPayload[0] > 0 ? true : false;
         if (doLocalityCheck) {
@@ -101,8 +102,8 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
       while (inputKvReader.next()) {
         sum += ((IntWritable) inputKvReader.getCurrentValue()).get();
       }
-      boolean doLocalityCheck = getContext().getUserPayload()[0] > 0 ? true : false;
-      int broadcastSum = getContext().getUserPayload()[1];
+      boolean doLocalityCheck = getContext().getUserPayload().getPayload()[0] > 0 ? true : false;
+      int broadcastSum = getContext().getUserPayload().getPayload()[1];
       int expectedSum = broadcastSum + getContext().getTaskIndex();
       System.out.println("Index: " + getContext().getTaskIndex() + 
           " sum: " + sum + " expectedSum: " + expectedSum + " broadcastSum: " + broadcastSum);
@@ -139,7 +140,8 @@ public class BroadcastAndOneToOneExample extends Configured implements Tool {
         numOneToOneTasks = 1;
       }
     }
-    byte[] procPayload = {(byte) (doLocalityCheck ? 1 : 0), 1};
+    byte[] procByte = {(byte) (doLocalityCheck ? 1 : 0), 1};
+    UserPayload procPayload = new UserPayload(procByte);
 
     System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 50db89a..3a081bb 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -62,6 +62,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -166,7 +167,7 @@ public class FilterLinesByWord extends Configured implements Tool {
     stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
     stage2Conf.setBoolean("mapred.mapper.new-api", false);
 
-    byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    UserPayload stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
     Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
@@ -186,8 +187,8 @@ public class FilterLinesByWord extends Configured implements Tool {
 
     // Setup stage2 Vertex
     Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
-        FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
-        .createUserPayloadFromConf(stage2Conf)), 1);
+        FilterByWordOutputProcessor.class.getName()).setUserPayload(
+        MRHelpers.createUserPayloadFromConf(stage2Conf)), 1);
     stage2Vertex.setTaskLocalFiles(commonLocalResources);
 
     // Configure the Output for stage2

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index 0040f02..a77713f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -55,6 +55,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -157,7 +158,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
     stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
     stage2Conf.setBoolean("mapred.mapper.new-api", false);
 
-    byte[] stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+    UserPayload stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
     // Setup stage1 Vertex
     Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
         FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
index a6a5dbd..fda61b0 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectDataGen.java
@@ -43,6 +43,7 @@ import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -199,8 +200,8 @@ public class IntersectDataGen extends Configured implements Tool {
     DAG dag = new DAG("IntersectDataGen");
 
     Vertex genDataVertex = new Vertex("datagen", new ProcessorDescriptor(
-        GenDataProcessor.class.getName()).setUserPayload(GenDataProcessor.createConfiguration(
-        largeOutSizePerTask, smallOutSizePerTask)), numTasks);
+        GenDataProcessor.class.getName()).setUserPayload(
+        new UserPayload(GenDataProcessor.createConfiguration(largeOutSizePerTask, smallOutSizePerTask))), numTasks);
     genDataVertex.addDataSink(STREAM_OUTPUT_NAME, 
         MROutput.createConfigurer(new Configuration(tezConf),
             TextOutputFormat.class, largeOutPath.toUri().toString()).create());
@@ -241,7 +242,7 @@ public class IntersectDataGen extends Configured implements Tool {
 
     @Override
     public void initialize() throws Exception {
-      byte[] payload = getContext().getUserPayload();
+      byte[] payload = getContext().getUserPayload().getPayload();
       ByteArrayInputStream bis = new ByteArrayInputStream(payload);
       DataInputStream dis = new DataInputStream(bis);
       streamOutputFileSize = dis.readLong();

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 4380840..0562633 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.collect.Maps;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -69,6 +70,7 @@ import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -82,6 +84,7 @@ import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 
 /**
@@ -517,7 +520,7 @@ public class MRRSleepJob extends Configured implements Tool {
     List<Vertex> vertices = new ArrayList<Vertex>();
 
     
-    byte[] mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+    UserPayload mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
     int numTasks = generateSplitsInAM ? -1 : numMapper;
 
     Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
@@ -531,7 +534,7 @@ public class MRRSleepJob extends Configured implements Tool {
       for (int i = 0; i < iReduceStagesCount; ++i) {
         Configuration iconf =
             intermediateReduceStageConfs[i];
-        byte[] iReduceUserPayload = MRHelpers.createUserPayloadFromConf(iconf);
+        UserPayload iReduceUserPayload = MRHelpers.createUserPayloadFromConf(iconf);
         Vertex ivertex = new Vertex("ireduce" + (i+1),
                 new ProcessorDescriptor(ReduceProcessor.class.getName()).
                 setUserPayload(iReduceUserPayload), numIReducer);
@@ -542,7 +545,7 @@ public class MRRSleepJob extends Configured implements Tool {
 
     Vertex finalReduceVertex = null;
     if (numReducer > 0) {
-      byte[] reducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+      UserPayload reducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
       finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
           ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
       finalReduceVertex.setTaskLocalFiles(commonLocalResources);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 5578665..255dcbd 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -61,6 +61,7 @@ import org.apache.tez.dag.api.Edge;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
@@ -184,7 +185,6 @@ public class TestOrderedWordCount extends Configured implements Tool {
     ByteArrayOutputStream outputStream = new ByteArrayOutputStream(4096);
     mapStageConf.writeXml(outputStream);
     String mapStageHistoryText = new String(outputStream.toByteArray(), "UTF-8");
-
     DataSourceDescriptor dsd;
     if (generateSplitsInClient) {
       mapStageConf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR,
@@ -215,7 +215,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
     ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
     finalReduceConf.writeXml(finalReduceOutputStream);
     String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
-    byte[] finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+    UserPayload finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
     Vertex finalReduceVertex = new Vertex("finalreduce",
         new ProcessorDescriptor(
             ReduceProcessor.class.getName())

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index aa7b836..f2b7043 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.examples.FilterLinesByWord;
 import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 8e5702f..2a0a1e0 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -30,6 +30,7 @@ import java.util.Map.Entry;
 import java.util.TreeMap;
 
 import com.google.common.collect.Maps;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -90,6 +91,7 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
@@ -411,9 +413,8 @@ public class YARNRunner implements ClientProtocol {
     
     stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, "part");
     
-    byte[] vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
-    Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).
-        setUserPayload(vertexUserPayload),
+    UserPayload vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
+    Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).setUserPayload(vertexUserPayload),
         numTasks, taskResource);
     if (isMap) {
       vertex.addDataSource("MRInput",
@@ -807,7 +808,7 @@ public class YARNRunner implements ClientProtocol {
 
   private static class MRInputHelpersInternal extends MRInputHelpers {
 
-    protected static byte[] createMRInputPayload(Configuration conf,
+    protected static UserPayload createMRInputPayload(Configuration conf,
                                                  MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws
         IOException {
       return MRInputHelpers.createMRInputPayload(conf, mrSplitsProto);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 1f6c8bd..021c82d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.client.VertexStatus;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -59,14 +60,14 @@ public class MROutputCommitter extends OutputCommitter {
 
   @Override
   public void initialize() throws IOException {
-    byte[] userPayload = getContext().getOutputUserPayload();
-    if (userPayload == null) {
+    UserPayload userPayload = getContext().getOutputUserPayload();
+    if (!userPayload.hasPayload()) {
       jobConf = new JobConf();
     } else {
       jobConf = new JobConf(
-          MRHelpers.createConfFromUserPayload(getContext().getOutputUserPayload()));
+          MRHelpers.createConfFromUserPayload(userPayload));
     }
-
+    
     // Read all credentials into the credentials instance stored in JobConf.
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
@@ -113,12 +114,6 @@ public class MROutputCommitter extends OutputCommitter {
       LOG.info("Using mapred newApiCommitter.");
     }
 
-    LOG.info("OutputCommitter set in config for outputName="
-        + context.getOutputName()
-        + ", vertexName=" + context.getVertexName()
-        + ", outputCommitterClass="
-        + jobConf.get("mapred.output.committer.class"));
-
     if (newApiCommitter) {
       TaskAttemptID taskAttemptID = new TaskAttemptID(
           Long.toString(context.getApplicationId().getClusterTimestamp()),

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index df942cc..c03d4bb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -22,12 +22,14 @@ import java.util.List;
 
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 888ed1c..c100177 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.TezYARNUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@@ -338,7 +339,7 @@ public class MRHelpers {
 
   @LimitedPrivate("Hive, Pig")
   @Unstable
-  public static byte[] createUserPayloadFromConf(Configuration conf)
+  public static UserPayload createUserPayloadFromConf(Configuration conf)
       throws IOException {
     return TezUtils.createUserPayloadFromConf(conf);
   }
@@ -351,9 +352,9 @@ public class MRHelpers {
 
   @LimitedPrivate("Hive, Pig")
   @Unstable
-  public static Configuration createConfFromUserPayload(byte[] bb)
+  public static Configuration createConfFromUserPayload(UserPayload payload)
       throws IOException {
-    return TezUtils.createConfFromUserPayload(bb);
+    return TezUtils.createConfFromUserPayload(payload);
   }
 
   @LimitedPrivate("Hive, Pig")
@@ -362,7 +363,6 @@ public class MRHelpers {
     return TezUtils.createConfFromByteString(bs);
   }
 
-
   /**
    * Extract the map task's container resource requirements from the
    * provided configuration.

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index 6e411b3..1a4af9c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -34,6 +34,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -61,6 +62,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
@@ -132,9 +134,9 @@ public class MRInputHelpers {
    * @throws IOException
    */
   @InterfaceStability.Evolving
-  public static MRRuntimeProtos.MRInputUserPayloadProto parseMRInputPayload(byte[] bytes)
+  public static MRRuntimeProtos.MRInputUserPayloadProto parseMRInputPayload(UserPayload payload)
       throws IOException {
-    return MRRuntimeProtos.MRInputUserPayloadProto.parseFrom(bytes);
+    return MRRuntimeProtos.MRInputUserPayloadProto.parseFrom(payload.getPayload());
   }
 
   /**
@@ -147,6 +149,7 @@ public class MRInputHelpers {
    * @return an instance of the split
    * @throws java.io.IOException
    */
+  @SuppressWarnings("unchecked")
   @InterfaceStability.Evolving
   public static InputSplit createOldFormatSplitFromUserPayload(
       MRRuntimeProtos.MRSplitProto splitProto, SerializationFactory serializationFactory)
@@ -657,7 +660,7 @@ public class MRInputHelpers {
    * or {@link org.apache.hadoop.mapreduce.split.TezGroupedSplitsInputFormat}
    */
   @InterfaceAudience.Private
-  protected static byte[] createMRInputPayloadWithGrouping(Configuration conf) throws IOException {
+  protected static UserPayload createMRInputPayloadWithGrouping(Configuration conf) throws IOException {
     Preconditions
         .checkArgument(conf != null, "Configuration must be specified");
     return createMRInputPayload(TezUtils.createByteStringFromConf(conf),
@@ -665,7 +668,7 @@ public class MRInputHelpers {
   }
 
   @InterfaceAudience.Private
-  protected static byte[] createMRInputPayload(Configuration conf,
+  protected static UserPayload createMRInputPayload(Configuration conf,
                                                MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws
       IOException {
     Preconditions
@@ -675,7 +678,7 @@ public class MRInputHelpers {
         mrSplitsProto, false);
   }
 
-  private static byte[] createMRInputPayload(ByteString bytes,
+  private static UserPayload createMRInputPayload(ByteString bytes,
                                              MRRuntimeProtos.MRSplitsProto mrSplitsProto,
                                              boolean isGrouped) throws IOException {
     MRRuntimeProtos.MRInputUserPayloadProto.Builder userPayloadBuilder =
@@ -688,7 +691,7 @@ public class MRInputHelpers {
     userPayloadBuilder.setGroupingEnabled(isGrouped);
     // TODO Should this be a ByteBuffer or a byte array ? A ByteBuffer would be
     // more efficient.
-    return userPayloadBuilder.build().toByteArray();
+    return new UserPayload(userPayloadBuilder.build().toByteArray());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 65b383c..6b8ed83 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -41,6 +41,7 @@ import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
@@ -207,8 +208,7 @@ public class MRInput extends MRInputBase {
       }
       MRHelpers.translateVertexConfToTez(inputConf);
       MRHelpers.doJobClientMagic(inputConf);
-      byte[] payload = MRInputHelpersInternal.createMRInputPayload(inputConf,
-          inputSplitInfo.getSplitsProto());
+      UserPayload payload = MRInputHelpersInternal.createMRInputPayload(inputConf, inputSplitInfo.getSplitsProto());
       Credentials credentials = null;
       if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
         credentials = inputSplitInfo.getCredentials();
@@ -229,7 +229,7 @@ public class MRInput extends MRInputBase {
 
       Credentials credentials = maybeGetCredentials();
 
-      byte[] payload = null;
+      UserPayload payload = null;
       if (groupSplitsInAM) {
         payload = MRInputHelpersInternal.createMRInputPayloadWithGrouping(inputConf);
       } else {
@@ -248,7 +248,7 @@ public class MRInput extends MRInputBase {
       
       Credentials credentials = maybeGetCredentials();
 
-      byte[] payload = null;
+      UserPayload payload = null;
       if (groupSplitsInAM) {
         payload = MRInputHelpersInternal.createMRInputPayloadWithGrouping(inputConf);
       } else {
@@ -512,12 +512,12 @@ public class MRInput extends MRInputBase {
 
   private static class MRInputHelpersInternal extends MRInputHelpers {
 
-    protected static byte[] createMRInputPayloadWithGrouping(Configuration conf) throws
+    protected static UserPayload createMRInputPayloadWithGrouping(Configuration conf) throws
         IOException {
       return MRInputHelpers.createMRInputPayloadWithGrouping(conf);
     }
 
-    protected static byte[] createMRInputPayload(Configuration conf,
+    protected static UserPayload createMRInputPayload(Configuration conf,
                                                  MRRuntimeProtos.MRSplitsProto mrSplitsProto) throws
         IOException {
       return MRInputHelpers.createMRInputPayload(conf, mrSplitsProto);

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
index 026919e..b7687e6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/base/MRInputBase.java
@@ -19,6 +19,7 @@
 package org.apache.tez.mapreduce.input.base;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
@@ -28,6 +29,7 @@ import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 7410a10..1add54d 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.api.DataSinkDescriptor;
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -154,7 +155,7 @@ public class MROutput extends AbstractLogicalOutput {
      * @return
      * @throws IOException
      */
-    private byte[] createUserPayload(Configuration conf, 
+    private UserPayload createUserPayload(Configuration conf,
         String outputFormatName, boolean useNewApi) {
       Configuration outputConf = new JobConf(conf);
       outputConf.setBoolean("mapred.reducer.new-api", useNewApi);
@@ -235,8 +236,7 @@ public class MROutput extends AbstractLogicalOutput {
     taskNumberFormat.setGroupingUsed(false);
     nonTaskNumberFormat.setMinimumIntegerDigits(3);
     nonTaskNumberFormat.setGroupingUsed(false);
-    Configuration conf = TezUtils.createConfFromUserPayload(
-        getContext().getUserPayload());
+    Configuration conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
     this.jobConf = new JobConf(conf);
     // Add tokens to the jobConf - in case they are accessed within the RW / OF
     jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index 4e22ca0..6caed26 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -68,6 +68,7 @@ import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
@@ -141,7 +142,7 @@ public abstract class MRTask extends AbstractLogicalIOProcessor {
             processorContext.getTaskIndex()),
         processorContext.getTaskAttemptNumber());
 
-    byte[] userPayload = processorContext.getUserPayload();
+    UserPayload userPayload = processorContext.getUserPayload();
     Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
     if (conf instanceof JobConf) {
       this.jobConf = (JobConf)conf;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
index fc8a72f..d9ff1b1 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/common/TestMRInputSplitDistributor.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.mapreduce.common;
 
+import org.apache.tez.dag.api.UserPayload;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -66,7 +68,7 @@ public class TestMRInputSplitDistributor {
     MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
     payloadProto.setSplits(splitsProtoBuilder.build());
     payloadProto.setConfigurationBytes(confByteString);
-    byte[] userPayload = payloadProto.build().toByteArray();
+    UserPayload userPayload = new UserPayload(payloadProto.build().toByteArray());
 
     InputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
     MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
@@ -114,7 +116,7 @@ public class TestMRInputSplitDistributor {
     MRInputUserPayloadProto.Builder payloadProto = MRInputUserPayloadProto.newBuilder();
     payloadProto.setSplits(splitsProtoBuilder.build());
     payloadProto.setConfigurationBytes(confByteString);
-    byte[] userPayload = payloadProto.build().toByteArray();
+    UserPayload userPayload = new UserPayload(payloadProto.build().toByteArray());
 
     InputInitializerContext context = new TezRootInputInitializerContextForTest(userPayload);
     MRInputSplitDistributor splitDist = new MRInputSplitDistributor(context);
@@ -146,11 +148,11 @@ public class TestMRInputSplitDistributor {
       InputInitializerContext {
 
     private final ApplicationId appId;
-    private final byte[] payload;
+    private final UserPayload payload;
 
-    TezRootInputInitializerContextForTest(byte[] payload) throws IOException {
+    TezRootInputInitializerContextForTest(UserPayload payload) throws IOException {
       appId = ApplicationId.newInstance(1000, 200);
-      this.payload = payload;
+      this.payload = payload == null ? new UserPayload(null) : payload;
     }
 
     @Override
@@ -169,7 +171,7 @@ public class TestMRInputSplitDistributor {
     }
 
     @Override
-    public byte[] getInputUserPayload() {
+    public UserPayload getInputUserPayload() {
       return payload;
     }
 
@@ -204,7 +206,7 @@ public class TestMRInputSplitDistributor {
     }
 
     @Override
-    public byte[] getUserPayload() {
+    public UserPayload getUserPayload() {
       throw new UnsupportedOperationException("getUserPayload not implemented in this mock");
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
index e2ace27..321ec75 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/input/TestMultiMRInput.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
@@ -109,8 +110,7 @@ public class TestMultiMRInput {
     assertEquals(1, splits.length);
 
     MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
-    InputDataInformationEvent event = new InputDataInformationEvent(0,
-        splitProto.toByteArray());
+    InputDataInformationEvent event = new InputDataInformationEvent(0, splitProto.toByteArray());
 
     eventList.clear();
     eventList.add(event);
@@ -169,12 +169,10 @@ public class TestMultiMRInput {
     assertEquals(2, splits.length);
 
     MRSplitProto splitProto1 = MRInputHelpers.createSplitProto(splits[0]);
-    InputDataInformationEvent event1 = new InputDataInformationEvent(0,
-        splitProto1.toByteArray());
+    InputDataInformationEvent event1 = new InputDataInformationEvent(0, splitProto1.toByteArray());
 
     MRSplitProto splitProto2 = MRInputHelpers.createSplitProto(splits[1]);
-    InputDataInformationEvent event2 = new InputDataInformationEvent(0,
-        splitProto2.toByteArray());
+    InputDataInformationEvent event2 = new InputDataInformationEvent(0, splitProto2.toByteArray());
 
     eventList.clear();
     eventList.add(event1);
@@ -222,10 +220,8 @@ public class TestMultiMRInput {
     assertEquals(1, splits.length);
 
     MRSplitProto splitProto = MRInputHelpers.createSplitProto(splits[0]);
-    InputDataInformationEvent event1 = new InputDataInformationEvent(0,
-        splitProto.toByteArray());
-    InputDataInformationEvent event2 = new InputDataInformationEvent(1,
-        splitProto.toByteArray());
+    InputDataInformationEvent event1 = new InputDataInformationEvent(0, splitProto.toByteArray());
+    InputDataInformationEvent event2 = new InputDataInformationEvent(1, splitProto.toByteArray());
 
     eventList.clear();
     eventList.add(event1);
@@ -254,7 +250,7 @@ public class TestMultiMRInput {
     doReturn(1).when(inputContext).getTaskIndex();
     doReturn(1).when(inputContext).getTaskVertexIndex();
     doReturn("taskVertexName").when(inputContext).getTaskVertexName();
-    doReturn(payload).when(inputContext).getUserPayload();
+    doReturn(new UserPayload(payload)).when(inputContext).getUserPayload();
     return inputContext;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 3731c64..79c69ab 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -53,6 +53,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 80a2a30..5727260 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -35,6 +35,7 @@ import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -123,9 +124,9 @@ public class TestMapProcessor {
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         new InputDescriptor(MRInputLegacy.class.getName())
-            .setUserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+            .setUserPayload(new UserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
                 .setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build()
-                .toByteArray()),
+                .toByteArray())),
         1);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", 
         new OutputDescriptor(LocalOnFileSorterOutput.class.getName())

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 89946c6..493ee57 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -43,6 +43,7 @@ import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.TestUmbilical;
 import org.apache.tez.mapreduce.TezTestUtils;
 import org.apache.tez.mapreduce.hadoop.IDConverter;
@@ -132,9 +133,9 @@ public class TestReduceProcessor {
 
     InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
         new InputDescriptor(MRInputLegacy.class.getName())
-            .setUserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
+            .setUserPayload(new UserPayload(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
                 .setConfigurationBytes(TezUtils.createByteStringFromConf(jobConf)).build()
-                .toByteArray()),
+                .toByteArray())),
         1);
     OutputSpec mapOutputSpec = new OutputSpec("NullDestVertex", 
         new OutputDescriptor(LocalOnFileSorterOutput.class.getName()).

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index 803d9e2..b682941 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -19,11 +19,14 @@
 package org.apache.tez.common;
 
 import com.google.protobuf.ByteString;
+
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
 import org.apache.tez.runtime.api.events.DataMovementEvent;
 import org.apache.tez.runtime.api.events.EventProtos;
 import org.apache.tez.runtime.api.events.InputDataInformationEvent;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
 
 public class ProtoConverters {
 
@@ -64,8 +67,23 @@ public class ProtoConverters {
       EventProtos.CompositeEventProto proto) {
     return new CompositeDataMovementEvent(proto.getStartIndex(),
         proto.getCount(),
-        proto.hasUserPayload() ?
-            proto.getUserPayload().toByteArray() : null);
+        proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null);
+  }
+  
+  public static EventProtos.VertexManagerEventProto convertVertexManagerEventToProto(
+      VertexManagerEvent event) {
+    EventProtos.VertexManagerEventProto.Builder vmBuilder = VertexManagerEventProto.newBuilder();
+    vmBuilder.setTargetVertexName(event.getTargetVertexName());
+    if (event.getUserPayload() != null) {
+      vmBuilder.setUserPayload(ByteString.copyFrom(event.getUserPayload()));
+    }
+    return vmBuilder.build();
+  }
+  
+  public static VertexManagerEvent convertVertexManagerEventFromProto(
+      EventProtos.VertexManagerEventProto vmProto) {
+    return new VertexManagerEvent(vmProto.getTargetVertexName(),
+        vmProto.hasUserPayload() ? vmProto.getUserPayload().toByteArray() : null);
   }
 
   public static EventProtos.RootInputDataInformationEventProto
@@ -84,8 +102,7 @@ public class ProtoConverters {
       convertRootInputDataInformationEventFromProto(
       EventProtos.RootInputDataInformationEventProto proto) {
     InputDataInformationEvent diEvent = new InputDataInformationEvent(
-        proto.getSourceIndex(), proto.getUserPayload() != null ? proto.getUserPayload()
-            .toByteArray() : null);
+        proto.getSourceIndex(), proto.hasUserPayload() ? proto.getUserPayload().toByteArray() : null);
     diEvent.setTargetIndex(proto.getTargetIndex());
     return diEvent;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
index fea1350..70bfad7 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezEvent.java
@@ -46,8 +46,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
 import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
 
-import com.google.protobuf.ByteString;
-
 public class TezEvent implements Writable {
 
   private EventType eventType;
@@ -139,13 +137,8 @@ public class TezEvent implements Writable {
                 (CompositeDataMovementEvent) event).toByteArray();
         break;
       case VERTEX_MANAGER_EVENT:
-        VertexManagerEvent vmEvt = (VertexManagerEvent) event;
-        VertexManagerEventProto.Builder vmBuilder = VertexManagerEventProto.newBuilder();
-        vmBuilder.setTargetVertexName(vmEvt.getTargetVertexName());
-        if (vmEvt.getUserPayload() != null) {
-          vmBuilder.setUserPayload(ByteString.copyFrom(vmEvt.getUserPayload()));
-        }
-        eventBytes = vmBuilder.build().toByteArray();
+        eventBytes = ProtoConverters.convertVertexManagerEventToProto((VertexManagerEvent) event)
+            .toByteArray();
         break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
@@ -214,10 +207,8 @@ public class TezEvent implements Writable {
         event = ProtoConverters.convertCompositeDataMovementEventFromProto(cProto);
         break;
       case VERTEX_MANAGER_EVENT:
-        VertexManagerEventProto vmProto =
-            VertexManagerEventProto.parseFrom(eventBytes);
-        event = new VertexManagerEvent(vmProto.getTargetVertexName(),
-            vmProto.getUserPayload() != null ? vmProto.getUserPayload().toByteArray() : null);
+        VertexManagerEventProto vmProto = VertexManagerEventProto.parseFrom(eventBytes);
+        event = ProtoConverters.convertVertexManagerEventFromProto(vmProto);
         break;
       case INPUT_READ_ERROR_EVENT:
         InputReadErrorEventProto ideProto =

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
index d39e5b3..4a0b646 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java
@@ -31,11 +31,10 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.RuntimeTask;
@@ -49,7 +48,7 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
 public class TezInputContextImpl extends TezTaskContextImpl
     implements InputContext {
 
-  private final TezUserPayload userPayload;
+  private final UserPayload userPayload;
   private final String sourceVertexName;
   private final EventMetaData sourceInfo;
   private final int inputIndex;
@@ -61,7 +60,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
                              int appAttemptNumber,
                              TezUmbilical tezUmbilical, String dagName, String taskVertexName,
                              String sourceVertexName, TezTaskAttemptID taskAttemptID,
-                             TezCounters counters, int inputIndex, @Nullable byte[] userPayload,
+                             TezCounters counters, int inputIndex, @Nullable UserPayload userPayload,
                              RuntimeTask runtimeTask,
                              Map<String, ByteBuffer> serviceConsumerMetadata,
                              Map<String, String> auxServiceEnv, MemoryDistributor memDist,
@@ -75,7 +74,7 @@ public class TezInputContextImpl extends TezTaskContextImpl
     checkNotNull(sourceVertexName, "sourceVertexName is null");
     checkNotNull(inputs, "input map is null");
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
     this.inputIndex = inputIndex;
     this.sourceVertexName = sourceVertexName;
     this.sourceInfo = new EventMetaData(
@@ -106,10 +105,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
     tezUmbilical.addEvents(tezEvents);
   }
 
-  @Nullable
   @Override
-  public byte[] getUserPayload() {
-    return userPayload.getPayload();
+  public UserPayload getUserPayload() {
+    return userPayload;
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
index f43282a..f0cecc2 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java
@@ -5,25 +5,22 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import javax.annotation.Nullable;
 
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.tez.common.TezUserPayload;
-import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.InputReadyTracker;
-import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.MergedLogicalInput;
 import org.apache.tez.runtime.api.MergedInputContext;
 
 
 public class TezMergedInputContextImpl implements MergedInputContext {
 
-  private final TezUserPayload userPayload;
+  private final UserPayload userPayload;
   private final String groupInputName;
   private final Map<String, MergedLogicalInput> groupInputsMap;
   private final InputReadyTracker inputReadyTracker;
   private final String[] workDirs;
 
-  public TezMergedInputContextImpl(@Nullable byte[] userPayload, String groupInputName,
+  public TezMergedInputContextImpl(@Nullable UserPayload userPayload, String groupInputName,
                                    Map<String, MergedLogicalInput> groupInputsMap,
                                    InputReadyTracker inputReadyTracker, String[] workDirs) {
     checkNotNull(groupInputName, "groupInputName is null");
@@ -31,15 +28,14 @@ public class TezMergedInputContextImpl implements MergedInputContext {
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
     this.groupInputName = groupInputName;
     this.groupInputsMap = groupInputsMap;
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
     this.inputReadyTracker = inputReadyTracker;
     this.workDirs = workDirs;
   }
 
-  @Nullable
   @Override
-  public byte[] getUserPayload() {
-    return userPayload.getPayload();
+  public UserPayload getUserPayload() {
+    return userPayload;
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
index 2d0d606..8f1fe85 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezOutputContextImpl.java
@@ -31,11 +31,10 @@ import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.Event;
@@ -47,7 +46,7 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
 public class TezOutputContextImpl extends TezTaskContextImpl
     implements OutputContext {
 
-  private final TezUserPayload userPayload;
+  private final UserPayload userPayload;
   private final String destinationVertexName;
   private final EventMetaData sourceInfo;
   private final int outputIndex;
@@ -58,7 +57,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
       String taskVertexName,
       String destinationVertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters, int outputIndex,
-      @Nullable byte[] userPayload, RuntimeTask runtimeTask,
+      @Nullable UserPayload userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       OutputDescriptor outputDescriptor, ObjectRegistry objectRegistry) {
@@ -68,7 +67,7 @@ public class TezOutputContextImpl extends TezTaskContextImpl
         auxServiceEnv, memDist, outputDescriptor, objectRegistry);
     checkNotNull(outputIndex, "outputIndex is null");
     checkNotNull(destinationVertexName, "destinationVertexName is null");
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
     this.outputIndex = outputIndex;
     this.destinationVertexName = destinationVertexName;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
@@ -96,10 +95,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     tezUmbilical.addEvents(tezEvents);
   }
 
-  @Nullable
   @Override
-  public byte[] getUserPayload() {
-    return userPayload.getPayload();
+  public UserPayload getUserPayload() {
+    return userPayload;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
index a20036e..eefa337 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezProcessorContextImpl.java
@@ -31,13 +31,10 @@ import java.util.Map;
 
 import javax.annotation.Nullable;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUserPayload;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.InputReadyTracker;
 import org.apache.tez.runtime.RuntimeTask;
@@ -50,16 +47,14 @@ import org.apache.tez.runtime.common.resources.MemoryDistributor;
 
 public class TezProcessorContextImpl extends TezTaskContextImpl implements ProcessorContext {
 
-  private static final Log LOG = LogFactory.getLog(TezProcessorContextImpl.class);
-  
-  private final TezUserPayload userPayload;
+  private final UserPayload userPayload;
   private final EventMetaData sourceInfo;
   private final InputReadyTracker inputReadyTracker;
 
   public TezProcessorContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber,
       TezUmbilical tezUmbilical, String dagName, String vertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
-      @Nullable byte[] userPayload, RuntimeTask runtimeTask,
+      @Nullable UserPayload userPayload, RuntimeTask runtimeTask,
       Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
       ProcessorDescriptor processorDescriptor, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry) {
@@ -67,7 +62,7 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
         counters, runtimeTask, tezUmbilical, serviceConsumerMetadata,
         auxServiceEnv, memDist, processorDescriptor, objectRegistry);
     checkNotNull(inputReadyTracker, "inputReadyTracker is null");
-    this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload);
+    this.userPayload = userPayload == null ? new UserPayload(null) : userPayload;
     this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
         taskVertexName, "", taskAttemptID);
     this.inputReadyTracker = inputReadyTracker;
@@ -84,10 +79,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl implements Proce
     tezUmbilical.addEvents(tezEvents);
   }
 
-  @Nullable
   @Override
-  public byte[] getUserPayload() {
-    return userPayload.getPayload();
+  public UserPayload getUserPayload() {
+    return userPayload;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
index 45aba9c..59db901 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezTaskContextImpl.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
 import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezEntityDescriptor;
+import org.apache.tez.dag.api.EntityDescriptor;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.RuntimeTask;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
@@ -58,7 +58,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
   private final int appAttemptNumber;
   private final Map<String, String> auxServiceEnv;
   protected final MemoryDistributor initialMemoryDistributor;
-  protected final TezEntityDescriptor<?> descriptor;
+  protected final EntityDescriptor<?> descriptor;
   private final String dagName;
   private final ObjectRegistry objectRegistry;
 
@@ -68,7 +68,7 @@ public abstract class TezTaskContextImpl implements TaskContext {
       TezCounters counters, RuntimeTask runtimeTask,
       TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata,
       Map<String, String> auxServiceEnv, MemoryDistributor memDist,
-      TezEntityDescriptor<?> descriptor, ObjectRegistry objectRegistry) {
+      EntityDescriptor<?> descriptor, ObjectRegistry objectRegistry) {
     checkNotNull(conf, "conf is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(taskVertexName, "taskVertexName is null");

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index 6d58635..042f837 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezEntityDescriptor;
+import org.apache.tez.dag.api.EntityDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.runtime.api.MemoryUpdateCallback;
 import org.apache.tez.runtime.api.InputContext;
@@ -93,7 +93,7 @@ public class MemoryDistributor {
    * Used by the Tez framework to request memory on behalf of user requests.
    */
   public void requestMemory(long requestSize, MemoryUpdateCallback callback,
-      TaskContext taskContext, TezEntityDescriptor<?> descriptor) {
+      TaskContext taskContext, EntityDescriptor<?> descriptor) {
     registerRequest(requestSize, callback, taskContext, descriptor);
   }
   
@@ -154,7 +154,7 @@ public class MemoryDistributor {
   }
 
   private long registerRequest(long requestSize, MemoryUpdateCallback callback,
-      TaskContext entityContext, TezEntityDescriptor<?> descriptor) {
+      TaskContext entityContext, EntityDescriptor<?> descriptor) {
     Preconditions.checkArgument(requestSize >= 0);
     Preconditions.checkNotNull(callback);
     Preconditions.checkNotNull(entityContext);
@@ -211,7 +211,7 @@ public class MemoryDistributor {
     private final InitialMemoryRequestContext requestContext;
 
     public RequestorInfo(TaskContext taskContext, long requestSize,
-        final MemoryUpdateCallback callback, TezEntityDescriptor<?> descriptor) {
+        final MemoryUpdateCallback callback, EntityDescriptor<?> descriptor) {
       InitialMemoryRequestContext.ComponentType type;
       String componentVertexName;
       if (taskContext instanceof InputContext) {

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
index d171365..27446bb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java
@@ -36,6 +36,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -432,7 +433,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin {
                     remainderRangeForLastShuffler : basePartitionRange));
         EdgeManagerPluginDescriptor edgeManagerDescriptor =
             new EdgeManagerPluginDescriptor(CustomShuffleEdgeManager.class.getName());
-        edgeManagerDescriptor.setUserPayload(edgeManagerConfig.toUserPayload());
+        edgeManagerDescriptor.setUserPayload(new UserPayload(edgeManagerConfig.toUserPayload()));
         edgeManagers.put(vertex, edgeManagerDescriptor);
       }
       

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 8e98d1c..4ed05aa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -52,6 +52,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
index bf41a26..81c1185 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/HadoopKeyValuesBasedBaseEdgeConfigurer.java
@@ -21,6 +21,7 @@ package org.apache.tez.runtime.library.conf;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.tez.dag.api.UserPayload;
 
 @InterfaceAudience.Private
 abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
@@ -29,7 +30,7 @@ abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
    * Get the payload for the configured Output
    * @return output configuration as a byte array
    */
-  public abstract byte[] getOutputPayload();
+  public abstract UserPayload getOutputPayload();
 
   /**
    * Get the output class name
@@ -41,7 +42,7 @@ abstract class HadoopKeyValuesBasedBaseEdgeConfigurer {
    * Get the payload for the configured Input
    * @return input configuration as a byte array
    */
-  public abstract byte[] getInputPayload();
+  public abstract UserPayload getInputPayload();
 
   /**
    * Get the input class name

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
index d2cd85f..09ffffb 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileSortedOutputConfigurer.java
@@ -21,6 +21,7 @@
 package org.apache.tez.runtime.library.conf;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@@ -172,7 +174,7 @@ public class OnFileSortedOutputConfigurer {
    */
   public byte[] toByteArray() {
     try {
-      return TezUtils.createUserPayloadFromConf(conf);
+      return TezUtils.createUserPayloadFromConf(conf).getPayload();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -181,7 +183,7 @@ public class OnFileSortedOutputConfigurer {
   @InterfaceAudience.Private
   public void fromByteArray(byte[] payload) {
     try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
+      this.conf = TezUtils.createConfFromUserPayload(new UserPayload(payload));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
index f84315f..0b2ce15 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedKVOutputConfigurer.java
@@ -21,6 +21,7 @@
 package org.apache.tez.runtime.library.conf;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
@@ -108,7 +110,7 @@ public class OnFileUnorderedKVOutputConfigurer {
    */
   public byte[] toByteArray() {
     try {
-      return TezUtils.createUserPayloadFromConf(conf);
+      return TezUtils.createUserPayloadFromConf(conf).getPayload();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -117,7 +119,7 @@ public class OnFileUnorderedKVOutputConfigurer {
   @InterfaceAudience.Private
   public void fromByteArray(byte[] payload) {
     try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
+      this.conf = TezUtils.createConfFromUserPayload(new UserPayload(payload));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
index 47f32ee..d86c76b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OnFileUnorderedPartitionedKVOutputConfigurer.java
@@ -21,6 +21,7 @@
 package org.apache.tez.runtime.library.conf;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
@@ -122,7 +124,7 @@ public class OnFileUnorderedPartitionedKVOutputConfigurer {
    */
   public byte[] toByteArray() {
     try {
-      return TezUtils.createUserPayloadFromConf(conf);
+      return TezUtils.createUserPayloadFromConf(conf).getPayload();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -131,7 +133,7 @@ public class OnFileUnorderedPartitionedKVOutputConfigurer {
   @InterfaceAudience.Private
   public void fromByteArray(byte[] payload) {
     try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
+      this.conf = TezUtils.createConfFromUserPayload(new UserPayload(payload));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
index 029d640..f0d06af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/OrderedPartitionedKVEdgeConfigurer.java
@@ -19,9 +19,11 @@
 package org.apache.tez.runtime.library.conf;
 
 import javax.annotation.Nullable;
+
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -29,6 +31,7 @@ import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
 
 /**
@@ -85,8 +88,8 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
   }
 
   @Override
-  public byte[] getOutputPayload() {
-    return outputConf.toByteArray();
+  public UserPayload getOutputPayload() {
+    return new UserPayload(outputConf.toByteArray());
   }
 
   @Override
@@ -95,8 +98,8 @@ public class OrderedPartitionedKVEdgeConfigurer extends HadoopKeyValuesBasedBase
   }
 
   @Override
-  public byte[] getInputPayload() {
-    return inputConf.toByteArray();
+  public UserPayload getInputPayload() {
+    return new UserPayload(inputConf.toByteArray());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/6507bda6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
index 6faa658..3320c87 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/ShuffledMergedInputConfigurer.java
@@ -21,6 +21,7 @@
 package org.apache.tez.runtime.library.conf;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Map;
 
@@ -33,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.input.ShuffledMergedInput;
@@ -242,7 +244,7 @@ public class ShuffledMergedInputConfigurer {
    */
   public byte[] toByteArray() {
     try {
-      return TezUtils.createUserPayloadFromConf(conf);
+      return TezUtils.createUserPayloadFromConf(conf).getPayload();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -250,7 +252,7 @@ public class ShuffledMergedInputConfigurer {
 
   public void fromByteArray(byte[] payload) {
     try {
-      this.conf = TezUtils.createConfFromUserPayload(payload);
+      this.conf = TezUtils.createConfFromUserPayload(new UserPayload(payload));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }