You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2016/06/30 23:30:16 UTC

tez git commit: TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). Contributed by Hitesh Shah.

Repository: tez
Updated Branches:
  refs/heads/master 71bb2defe -> 3b08cbf90


TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). Contributed by Hitesh Shah.


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b08cbf9
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b08cbf9
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b08cbf9

Branch: refs/heads/master
Commit: 3b08cbf907784de463c9e3c05147b5c6d681251d
Parents: 71bb2de
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu Jun 30 16:30:01 2016 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Jun 30 16:30:01 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/tez/dag/api/TezConfiguration.java    | 12 +--
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 10 ++-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  3 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 14 ++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 22 +++--
 .../tez/dag/app/dag/impl/TestVertexImpl2.java   | 28 +++++-
 .../tez/dag/app/rm/TestContainerReuse.java      |  2 +-
 .../org/apache/tez/util/ProtoConverters.java    | 31 ++++++-
 .../src/test/proto/TezDaemonProtocol.proto      |  1 +
 .../tez/mapreduce/output/TestMROutput.java      |  2 +-
 .../tez/mapreduce/processor/MapUtils.java       |  2 +-
 .../processor/reduce/TestReduceProcessor.java   |  2 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  6 ++
 .../apache/tez/runtime/api/impl/TaskSpec.java   | 53 ++++++++++--
 .../apache/tez/runtime/task/TezTaskRunner2.java | 13 ++-
 .../TestLogicalIOProcessorRuntimeTask.java      |  2 +-
 .../tez/runtime/api/impl/TestTaskSpec.java      | 91 ++++++++++++++++++++
 .../tez/runtime/task/TestTaskExecution2.java    |  2 +-
 .../tez/runtime/task/TestTezTaskRunner2.java    | 65 ++++++++++++++
 20 files changed, 326 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ebc4e79..9a5904f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container).
   TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
   TEZ-3314. Double counting input bytes in MultiMRInput.
   TEZ-3308. Add counters to capture input split length.
@@ -70,6 +71,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container).
   TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
   TEZ-3314. Double counting input bytes in MultiMRInput.
   TEZ-3308. Add counters to capture input split length.

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 4118bb5..936c5db 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -807,7 +807,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   @ConfigurationProperty(type="boolean")
   public static final String TEZ_TASK_SCALE_MEMORY_ENABLED = TEZ_TASK_PREFIX
       + "scale.memory.enabled";
@@ -819,7 +819,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   @ConfigurationProperty
   public static final String TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS = TEZ_TASK_PREFIX
       + "scale.memory.allocator.class";
@@ -833,7 +833,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   @ConfigurationProperty(type="double")
   public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX
       + "scale.memory.reserve-fraction";
@@ -846,7 +846,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   @ConfigurationProperty(type="float")
   public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO =
       TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.per-io";
@@ -856,7 +856,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Max cumulative total reservation for additional IOs.
    */
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   @ConfigurationProperty(type="float")
   public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX =
       TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.max";
@@ -867,7 +867,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   @ConfigurationProperty
   public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS =
       TEZ_TASK_PREFIX + "scale.memory.ratios";

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index a6c6c02..fd6d446 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -198,7 +198,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private long cachedCountersTimestamp = 0;
   private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
 
+  // Combined configs for the DAG
   private final Configuration dagConf;
+  // DAG specific configs only
+  // Useful when trying to serialize only the diff from global configs
+  private final Configuration dagOnlyConf;
+
   private final DAGPlan jobPlan;
 
   private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false);
@@ -499,6 +504,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     this.dagId = dagId;
     this.jobPlan = jobPlan;
     this.dagConf = new Configuration(amConf);
+    this.dagOnlyConf = new Configuration(false);
     Iterator<PlanKeyValuePair> iter =
         jobPlan.getDagConf().getConfKeyValuesList().iterator();
     // override the amConf by using DAG level configuration
@@ -506,6 +512,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       PlanKeyValuePair keyValPair = iter.next();
       TezConfiguration.validateProperty(keyValPair.getKey(), Scope.DAG);
       this.dagConf.set(keyValPair.getKey(), keyValPair.getValue());
+      this.dagOnlyConf.set(keyValPair.getKey(), keyValPair.getValue());
     }
     this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
     this.userName = appUserName;
@@ -1626,7 +1633,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.eventHandler, dag.taskCommunicatorManagerInterface,
         dag.clock, dag.taskHeartbeatHandler,
         !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
-        dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker);
+        dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker,
+        dag.dagOnlyConf);
     return v;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 28a1c5e..ec7db61 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -723,7 +723,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     TaskSpec taskSpec = new TaskSpec(attemptId,
         baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
         baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
-        baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+        baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs(),
+        baseTaskSpec.getTaskConf());
     return new TaskAttemptImpl(attemptId, eventHandler,
         taskCommunicatorManagerInterface, conf, clock, taskHeartbeatHandler, appContext,
         (failedAttempts > 0), taskResource, containerContext, leafVertex, getVertex(),

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 6b79e98..01bca8f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -236,7 +236,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private long cachedCountersTimestamp = 0;
   private Resource taskResource;
 
+  // Merged/combined vertex level config
   private Configuration vertexConf;
+  // Vertex specific configs only ( include the dag specific configs too )
+  // Useful when trying to serialize only the diff from global configs
+  @VisibleForTesting
+  Configuration vertexOnlyConf;
   
   private final boolean isSpeculationEnabled;
 
@@ -854,19 +859,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       TaskHeartbeatHandler thh, boolean commitVertexOutputs,
       AppContext appContext, VertexLocationHint vertexLocationHint,
       Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
-      StateChangeNotifier entityStatusTracker) {
+      StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) {
     this.vertexId = vertexId;
     this.vertexPlan = vertexPlan;
     this.vertexName = StringInterner.weakIntern(vertexName);
     this.vertexConf = new Configuration(dagConf);
-    // override dag configuration by using vertex's specified configuration
+    this.vertexOnlyConf = new Configuration(dagOnlyConf);
     if (vertexPlan.hasVertexConf()) {
       ConfigurationProto confProto = vertexPlan.getVertexConf();
       for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) {
         TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX);
         vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue());
+        vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue());
       }
     }
+
+
     this.clock = clock;
     this.appContext = appContext;
     this.commitVertexOutputs = commitVertexOutputs;
@@ -1567,7 +1575,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return TaskSpec.createBaseTaskSpec(getDAG().getName(),
         getName(), getTotalTasks(), getProcessorDescriptor(),
         getInputSpecList(taskIndex), getOutputSpecList(taskIndex), 
-        getGroupInputSpecList(taskIndex));
+        getGroupInputSpecList(taskIndex), vertexOnlyConf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6b30a24..d165272 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2261,6 +2261,8 @@ public class TestVertexImpl {
     LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
     vertices = new HashMap<String, VertexImpl>();
     vertexIdMap = new HashMap<TezVertexID, VertexImpl>();
+    Configuration dagConf = new Configuration(false);
+    dagConf.set("abc", "foobar");
     for (int i = 0; i < vCnt; ++i) {
       VertexPlan vPlan = dagPlan.getVertex(i);
       String vName = vPlan.getName();
@@ -2272,17 +2274,18 @@ public class TestVertexImpl {
         if (customInitializer == null) {
           v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf,
               dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
-              clock, thh, appContext, locationHint, dispatcher, updateTracker);
+              clock, thh, appContext, locationHint, dispatcher, updateTracker, dagConf);
         } else {
           v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf,
               dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
-              clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker);
+              clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker,
+              dagConf);
         }
       } else {
         v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
             dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
             clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption,
-            updateTracker);
+            updateTracker, dagConf);
       }
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
@@ -3126,6 +3129,7 @@ public class TestVertexImpl {
     TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0);
     Assert.assertEquals(mockLocation, event.getTaskLocationHint());
     Assert.assertNotNull(event.getBaseTaskSpec());
+    Assert.assertEquals("foobar", event.getBaseTaskSpec().getTaskConf().get("abc"));
   }
   
   @Test(timeout = 5000)
@@ -5422,7 +5426,7 @@ public class TestVertexImpl {
       VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
           clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
-          updateTracker);
+          updateTracker, new Configuration(false));
       v.setInputVertices(new HashMap());
       vertexIdMap.put(vId, v);
       vertices.put(v.getName(), v);
@@ -5457,11 +5461,12 @@ public class TestVertexImpl {
                                                  VertexLocationHint vertexLocationHint,
                                                  DrainDispatcher dispatcher,
                                                  InputInitializer presetInitializer,
-                                                 StateChangeNotifier updateTracker) {
+                                                 StateChangeNotifier updateTracker,
+                                                 Configuration dagConf) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
           taskCommunicatorManagerInterface, clock, thh, true,
           appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
-          updateTracker);
+          updateTracker, dagConf);
       this.presetInitializer = presetInitializer;
     }
 
@@ -5496,11 +5501,12 @@ public class TestVertexImpl {
                                                       AppContext appContext,
                                                       VertexLocationHint vertexLocationHint,
                                                       DrainDispatcher dispatcher,
-                                                      StateChangeNotifier updateTracker) {
+                                                      StateChangeNotifier updateTracker,
+                                                      Configuration dagConf) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
           taskCommunicatorManagerInterface, clock, thh, true,
           appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
-          updateTracker);
+          updateTracker, dagConf);
       this.dispatcher = dispatcher;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index 8bd288a..b1976c3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -49,6 +49,8 @@ import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex.VertexExecutionContext;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.TezNamedEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
@@ -60,6 +62,7 @@ import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
+import org.junit.Assert;
 import org.junit.Test;
 
 /**
@@ -433,13 +436,18 @@ public class TestVertexImpl2 {
       ExecutionContextTestInfoHolder vertexInfo) {
     VertexPlan vertexPlan = createVertexPlanForExeuctionContextTests(vertexInfo);
     VertexWrapper vertexWrapper =
-        new VertexWrapper(vertexInfo.appContext, vertexPlan, new Configuration(false));
+        new VertexWrapper(vertexInfo.appContext, vertexPlan, new Configuration(false), true);
     return vertexWrapper;
   }
 
   private VertexPlan createVertexPlanForExeuctionContextTests(ExecutionContextTestInfoHolder info) {
+    ConfigurationProto confProto = ConfigurationProto.newBuilder()
+        .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo").setValue("bar").build())
+        .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo1").setValue("bar2").build())
+        .build();
     VertexPlan.Builder vertexPlanBuilder = VertexPlan.newBuilder()
         .setName(info.vertexName)
+        .setVertexConf(confProto)
         .setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder()
             .setNumTasks(10)
             .setJavaOpts("dontcare")
@@ -502,7 +510,8 @@ public class TestVertexImpl2 {
     final VertexImpl vertex;
     final VertexPlan vertexPlan;
 
-    VertexWrapper(AppContext appContext, VertexPlan vertexPlan, Configuration conf) {
+    VertexWrapper(AppContext appContext, VertexPlan vertexPlan, Configuration conf,
+                  boolean checkVertexOnlyConf) {
       if (appContext == null) {
         mockAppContext = createDefaultMockAppContext();
         DAG mockDag = mock(DAG.class);
@@ -512,6 +521,9 @@ public class TestVertexImpl2 {
         mockAppContext = appContext;
       }
 
+      Configuration dagConf = new Configuration(false);
+      dagConf.set("abc1", "xyz1");
+      dagConf.set("foo1", "bar1");
 
       this.vertexPlan = vertexPlan;
 
@@ -520,11 +532,19 @@ public class TestVertexImpl2 {
               "testvertex", conf, mock(EventHandler.class), mock(TaskCommunicatorManagerInterface.class),
               mock(Clock.class), mock(TaskHeartbeatHandler.class), false, mockAppContext,
               VertexLocationHint.create(new LinkedList<TaskLocationHint>()), null,
-              new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class));
+              new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class),
+              dagConf);
+
+      if (checkVertexOnlyConf) {
+        Assert.assertEquals("xyz1", vertex.vertexOnlyConf.get("abc1"));
+        Assert.assertEquals("bar2", vertex.vertexOnlyConf.get("foo1"));
+        Assert.assertEquals("bar", vertex.vertexOnlyConf.get("foo"));
+      }
+
     }
 
     VertexWrapper(VertexPlan vertexPlan, Configuration conf) {
-      this(null, vertexPlan, conf);
+      this(null, vertexPlan, conf, false);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 99c85ab..a45f092 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1417,7 +1417,7 @@ public class TestContainerReuse {
       Collections.singletonList(new InputSpec("vertexName",
           InputDescriptor.create("inputClassName"), 1)),
       Collections.singletonList(new OutputSpec("vertexName",
-          OutputDescriptor.create("outputClassName"), 1)), null), ta, locationHint,
+          OutputDescriptor.create("outputClassName"), 1)), null, null), ta, locationHint,
       priority.getPriority(), containerContext, 0, 0, 0);
     return lr;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
index 60ebc53..25d61d0 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
@@ -15,12 +15,18 @@
 package org.apache.tez.util;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.runtime.api.impl.GroupInputSpec;
 import org.apache.tez.runtime.api.impl.InputSpec;
@@ -29,6 +35,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.GroupInputSpecProto;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.IOSpecProto;
 import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto;
+import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto.Builder;
 
 public class ProtoConverters {
 
@@ -65,15 +72,24 @@ public class ProtoConverters {
       }
     }
 
+    Configuration taskConf = null;
+    if (taskSpecProto.hasTaskConf()) {
+      taskConf = new Configuration(false);
+      Map<String, String> confMap =
+          DagTypeConverters.convertConfFromProto(taskSpecProto.getTaskConf());
+      for (Entry<String, String> e : confMap.entrySet()) {
+        taskConf.set(e.getKey(), e.getValue());
+      }
+    }
     TaskSpec taskSpec =
         new TaskSpec(taskAttemptID, taskSpecProto.getDagName(), taskSpecProto.getVertexName(),
             taskSpecProto.getVertexParallelism(), processorDescriptor, inputSpecList,
-            outputSpecList, groupInputSpecs);
+            outputSpecList, groupInputSpecs, taskConf);
     return taskSpec;
   }
 
   public static TaskSpecProto convertTaskSpecToProto(TaskSpec taskSpec) {
-    TaskSpecProto.Builder builder = TaskSpecProto.newBuilder();
+    Builder builder = TaskSpecProto.newBuilder();
     builder.setTaskAttemptIdString(taskSpec.getTaskAttemptID().toString());
     builder.setDagName(taskSpec.getDAGName());
     builder.setVertexName(taskSpec.getVertexName());
@@ -102,6 +118,17 @@ public class ProtoConverters {
 
       }
     }
+    if (taskSpec.getTaskConf() != null) {
+      ConfigurationProto.Builder confBuilder = ConfigurationProto.newBuilder();
+      Iterator<Entry<String, String>> iter = taskSpec.getTaskConf().iterator();
+      while (iter.hasNext()) {
+        Entry<String, String> entry = iter.next();
+        confBuilder.addConfKeyValues(PlanKeyValuePair.newBuilder()
+            .setKey(entry.getKey())
+            .setValue(entry.getValue()).build());
+      }
+      builder.setTaskConf(confBuilder.build());
+    }
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
index 2f8b2e6..a01a299 100644
--- a/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
+++ b/tez-ext-service-tests/src/test/proto/TezDaemonProtocol.proto
@@ -44,6 +44,7 @@ message TaskSpecProto {
   repeated IOSpecProto output_specs = 6;
   repeated GroupInputSpecProto grouped_input_specs = 7;
   optional int32 vertex_parallelism = 8;
+  optional ConfigurationProto task_conf = 9;
 }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 0c1dc66..05bcd98 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -192,7 +192,7 @@ public class TestMROutput {
         dagName, vertexName, -1,
         procDesc,
         inputSpecs,
-        outputSpecs, null);
+        outputSpecs, null, null);
 
     FileSystem fs = FileSystem.getLocal(conf);
     Path workDir =

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 133ef9e..8309966 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -213,7 +213,7 @@ public class MapUtils {
         dagName, vertexName, -1,
         mapProcessorDesc,
         inputSpecs,
-        outputSpecs, null);
+        outputSpecs, null, null);
 
     Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
     serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 382bc0e..1922c53 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -205,7 +205,7 @@ public class TestReduceProcessor {
         reduceVertexName, -1,
         reduceProcessorDesc,
         Collections.singletonList(reduceInputSpec),
-        Collections.singletonList(reduceOutputSpec), null);
+        Collections.singletonList(reduceOutputSpec), null, null);
 
     Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
     serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 0863e65..e49791f 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -1054,4 +1054,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
   public HadoopShim getHadoopShim() {
     return hadoopShim;
   }
+
+  @Private
+  @VisibleForTesting
+  public Configuration getTaskConf() {
+    return tezConf;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index 7fce1d4..78bb1e9 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -42,6 +44,7 @@ public class TaskSpec implements Writable {
   private List<OutputSpec> outputSpecList;
   private List<GroupInputSpec> groupInputSpecList;
   private int vertexParallelism = -1;
+  private Configuration taskConf;
 
   public TaskSpec() {
   }
@@ -49,17 +52,27 @@ public class TaskSpec implements Writable {
   public static TaskSpec createBaseTaskSpec(String dagName, String vertexName,
       int vertexParallelism, ProcessorDescriptor processorDescriptor,
       List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
-      @Nullable List<GroupInputSpec> groupInputSpecList) {
+      @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
     return new TaskSpec(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
-        outputSpecList, groupInputSpecList);
+        outputSpecList, groupInputSpecList, taskConf);
   }
 
   public TaskSpec(
       String dagName, String vertexName,
       int vertexParallelism,
       ProcessorDescriptor processorDescriptor,
-      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, 
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
       @Nullable List<GroupInputSpec> groupInputSpecList) {
+    this(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+        outputSpecList, groupInputSpecList, null);
+  }
+
+  public TaskSpec(
+      String dagName, String vertexName,
+      int vertexParallelism,
+      ProcessorDescriptor processorDescriptor,
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+      @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
     checkNotNull(dagName, "dagName is null");
     checkNotNull(vertexName, "vertexName is null");
     checkNotNull(processorDescriptor, "processorDescriptor is null");
@@ -73,14 +86,25 @@ public class TaskSpec implements Writable {
     this.outputSpecList = outputSpecList;
     this.groupInputSpecList = groupInputSpecList;
     this.vertexParallelism = vertexParallelism;
+    this.taskConf = taskConf;
+  }
+
+  public TaskSpec(TezTaskAttemptID taskAttemptID,
+                  String dagName, String vertexName,
+                  int vertexParallelism,
+                  ProcessorDescriptor processorDescriptor,
+                  List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+                  @Nullable List<GroupInputSpec> groupInputSpecList) {
+    this(taskAttemptID, dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+        outputSpecList, groupInputSpecList, null);
   }
 
   public TaskSpec(TezTaskAttemptID taskAttemptID,
       String dagName, String vertexName,
       int vertexParallelism,
       ProcessorDescriptor processorDescriptor,
-      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, 
-      @Nullable List<GroupInputSpec> groupInputSpecList) {
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+      @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
     checkNotNull(taskAttemptID, "taskAttemptID is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(vertexName, "vertexName is null");
@@ -95,6 +119,7 @@ public class TaskSpec implements Writable {
     this.outputSpecList = outputSpecList;
     this.groupInputSpecList = groupInputSpecList;
     this.vertexParallelism = vertexParallelism;
+    this.taskConf = taskConf;
   }
 
   public String getDAGName() {
@@ -133,6 +158,10 @@ public class TaskSpec implements Writable {
     return groupInputSpecList;
   }
 
+  public Configuration getTaskConf() {
+    return taskConf;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     taskAttemptId.write(out);
@@ -157,6 +186,12 @@ public class TaskSpec implements Writable {
     } else {
       out.writeBoolean(false);
     }
+    if (taskConf != null) {
+      out.writeBoolean(true);
+      taskConf.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
   }
 
   @Override
@@ -192,6 +227,11 @@ public class TaskSpec implements Writable {
         groupInputSpecList.add(group);
       }
     }
+    boolean hasVertexConf = in.readBoolean();
+    if (hasVertexConf) {
+      taskConf = new Configuration(false);
+      taskConf.readFields(in);
+    }
   }
 
   @Override
@@ -220,6 +260,9 @@ public class TaskSpec implements Writable {
       }
       sb.append("]");
     }
+    if (taskConf != null) {
+      sb.append(", taskConfEntryCount=" + taskConf.size());
+    }
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index afa08e7..96f8474 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -17,7 +17,9 @@ package org.apache.tez.runtime.task;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -93,6 +95,7 @@ public class TezTaskRunner2 {
   private final Condition oobSignalCondition = oobSignalLock.newCondition();
 
   private volatile long taskKillStartTime  = 0;
+  final Configuration taskConf;
 
   private final HadoopShim hadoopShim;
 
@@ -114,7 +117,15 @@ public class TezTaskRunner2 {
     this.executor = executor;
     this.umbilicalAndErrorHandler = new UmbilicalAndErrorHandler();
     this.hadoopShim = hadoopShim;
-    this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs,
+    this.taskConf = new Configuration(tezConf);
+    if (taskSpec.getTaskConf() != null) {
+      Iterator<Entry<String, String>> iter = taskSpec.getTaskConf().iterator();
+      while (iter.hasNext()) {
+        Entry<String, String> entry = iter.next();
+        taskConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    this.task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs,
         umbilicalAndErrorHandler, serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap,
         objectRegistry, pid, executionContext, memAvailable, updateSysCounters, hadoopShim);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 00e830f..ecfc424 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -190,7 +190,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     ProcessorDescriptor processorDesc = createProcessorDescriptor();
     TaskSpec taskSpec = new TaskSpec(taskAttemptID,
         dagName, vertexName, parallelism, processorDesc,
-        createInputSpecList(), createOutputSpecList(), null);
+        createInputSpecList(), createOutputSpecList(), null, null);
     return taskSpec;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
new file mode 100644
index 0000000..dfe19f8
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTaskSpec {
+
+  @Test (timeout = 5000)
+  public void testSerDe() throws IOException {
+    ByteBuffer payload = null;
+    ProcessorDescriptor procDesc = ProcessorDescriptor.create("proc").setUserPayload(
+        UserPayload.create(payload)).setHistoryText("historyText");
+
+    List<InputSpec> inputSpecs = new ArrayList<>();
+    InputSpec inputSpec = new InputSpec("src1", InputDescriptor.create("inputClass"),10);
+    inputSpecs.add(inputSpec);
+    List<OutputSpec> outputSpecs = new ArrayList<>();
+    OutputSpec outputSpec = new OutputSpec("dest1", OutputDescriptor.create("outputClass"), 999);
+    outputSpecs.add(outputSpec);
+    List<GroupInputSpec> groupInputSpecs = null;
+
+    Configuration taskConf = new Configuration(false);
+    taskConf.set("foo", "bar");
+
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(
+        TezVertexID.getInstance(TezDAGID.getInstance("1234", 1, 1), 1), 1), 1);
+    TaskSpec taskSpec = new TaskSpec(taId, "dagName", "vName", -1, procDesc, inputSpecs, outputSpecs,
+        groupInputSpecs, taskConf);
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(bos);
+    taskSpec.write(out);
+
+    TaskSpec deSerTaskSpec = new TaskSpec();
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    DataInput in = new DataInputStream(bis);
+    deSerTaskSpec.readFields(in);
+
+    Assert.assertEquals(taskSpec.getDAGName(), deSerTaskSpec.getDAGName());
+    Assert.assertEquals(taskSpec.getVertexName(), deSerTaskSpec.getVertexName());
+    Assert.assertEquals(taskSpec.getVertexParallelism(), deSerTaskSpec.getVertexParallelism());
+    Assert.assertEquals(taskSpec.getInputs().size(), deSerTaskSpec.getInputs().size());
+    Assert.assertEquals(taskSpec.getOutputs().size(), deSerTaskSpec.getOutputs().size());
+    Assert.assertNull(deSerTaskSpec.getGroupInputs());
+    Assert.assertEquals(taskSpec.getInputs().get(0).getSourceVertexName(),
+        deSerTaskSpec.getInputs().get(0).getSourceVertexName());
+    Assert.assertEquals(taskSpec.getOutputs().get(0).getDestinationVertexName(),
+        deSerTaskSpec.getOutputs().get(0).getDestinationVertexName());
+
+    Assert.assertEquals(taskConf.get("foo"), deSerTaskSpec.getTaskConf().get("foo"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
index c3b9abd..6cb49fa 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTaskExecution2.java
@@ -740,7 +740,7 @@ public class TestTaskExecution2 {
         .setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
     TaskSpec taskSpec =
         new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
-            new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null);
+            new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null, null);
 
     TezTaskRunner2 taskRunner;
     if (testRunner) {

http://git-wip-us.apache.org/repos/asf/tez/blob/3b08cbf9/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
new file mode 100644
index 0000000..f58421a
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner2.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.ProtoConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.hadoop.shim.DefaultHadoopShim;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestTezTaskRunner2 {
+
+  @Test (timeout = 5000)
+  public void testTaskConfUsage() throws Exception {
+    Configuration conf = new Configuration(false);
+    conf.set("global", "global1");
+    conf.set("global_override", "global1");
+    String[] localDirs = null;
+    Configuration taskConf = new Configuration(false);
+    conf.set("global_override", "task1");
+    conf.set("task", "task1");
+
+    List<InputSpec> inputSpecList = new ArrayList<>();
+    List<OutputSpec> outputSpecList = new ArrayList<>();
+    TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class),
+        inputSpecList, outputSpecList, null, taskConf);
+    TezTaskRunner2 taskRunner2 = new TezTaskRunner2(conf, mock(UserGroupInformation.class),
+        localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid",
+        null, 1000, false, new DefaultHadoopShim());
+
+    Assert.assertEquals("global1", taskRunner2.task.getTaskConf().get("global"));
+    Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("global_override"));
+    Assert.assertEquals("task1", taskRunner2.task.getTaskConf().get("task"));
+  }
+
+
+}