You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/07/07 19:53:00 UTC

tez git commit: TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). (hitesh)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 b06afabb9 -> 8eb707e4e


TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container). (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/8eb707e4
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/8eb707e4
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/8eb707e4

Branch: refs/heads/branch-0.7
Commit: 8eb707e4e4ef8113d0265d21f1f0c0cb729e066f
Parents: b06afab
Author: Hitesh Shah <hi...@apache.org>
Authored: Thu Jul 7 12:52:31 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Thu Jul 7 12:52:31 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    | 12 +--
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    | 10 ++-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  6 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 14 ++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    | 22 +++--
 .../tez/dag/app/dag/impl/TestVertexImpl2.java   | 18 +++-
 .../tez/dag/app/rm/TestContainerReuse.java      |  2 +-
 .../tez/mapreduce/output/TestMROutput.java      |  2 +-
 .../tez/mapreduce/processor/MapUtils.java       |  2 +-
 .../processor/reduce/TestReduceProcessor.java   |  2 +-
 .../runtime/LogicalIOProcessorRuntimeTask.java  |  6 ++
 .../apache/tez/runtime/api/impl/TaskSpec.java   | 53 ++++++++++--
 .../apache/tez/runtime/task/TezTaskRunner.java  | 17 +++-
 .../TestLogicalIOProcessorRuntimeTask.java      |  2 +-
 .../tez/runtime/api/impl/TestTaskSpec.java      | 91 ++++++++++++++++++++
 .../tez/runtime/task/TestTezTaskRunner.java     | 61 +++++++++++++
 17 files changed, 288 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8696a59..4626e93 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3286. Allow clients to set processor reserved memory per vertex (instead of per container).
   TEZ-3223. Support a NullHistoryLogger to disable history logging if needed.
   TEZ-3293. Fetch failures can cause a shuffle hang waiting for memory merge that never starts.
   TEZ-3305. TestAnalyzer fails on Hadoop 2.7.

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 3bbc0f9..a96d560 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -749,7 +749,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_TASK_SCALE_MEMORY_ENABLED = TEZ_TASK_PREFIX
       + "scale.memory.enabled";
   @Private
@@ -760,7 +760,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS = TEZ_TASK_PREFIX
       + "scale.memory.allocator.class";
   @Private
@@ -773,7 +773,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION = TEZ_TASK_PREFIX
       + "scale.memory.reserve-fraction";
   @Private
@@ -785,7 +785,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO =
       TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.per-io";
 
@@ -794,7 +794,7 @@ public class TezConfiguration extends Configuration {
   /**
    * Max cumulative total reservation for additional IOs.
    */
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX =
       TEZ_TASK_PREFIX + "scale.memory.additional-reservation.fraction.max";
   /*
@@ -804,7 +804,7 @@ public class TezConfiguration extends Configuration {
    */
   @Private
   @Unstable
-  @ConfigurationScope(Scope.AM)
+  @ConfigurationScope(Scope.VERTEX)
   public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS =
       TEZ_TASK_PREFIX + "scale.memory.ratios";
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index 6b98a7b..b7ad3b1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -194,7 +194,12 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   private long cachedCountersTimestamp = 0;
   private Set<TezVertexID> reRunningVertices = new HashSet<TezVertexID>();
 
+  // Combined configs for the DAG
   private final Configuration dagConf;
+  // DAG specific configs only
+  // Useful when trying to serialize only the diff from global configs
+  private final Configuration dagOnlyConf;
+
   private final DAGPlan jobPlan;
 
   private final AtomicBoolean internalErrorTriggered = new AtomicBoolean(false);
@@ -502,6 +507,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     this.dagId = dagId;
     this.jobPlan = jobPlan;
     this.dagConf = new Configuration(amConf);
+    this.dagOnlyConf = new Configuration(false);
     Iterator<PlanKeyValuePair> iter =
         jobPlan.getDagConf().getConfKeyValuesList().iterator();
     // override the amConf by using DAG level configuration
@@ -509,6 +515,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       PlanKeyValuePair keyValPair = iter.next();
       TezConfiguration.validateProperty(keyValPair.getKey(), Scope.DAG);
       this.dagConf.set(keyValPair.getKey(), keyValPair.getValue());
+      this.dagOnlyConf.set(keyValPair.getKey(), keyValPair.getValue());
     }
     this.dagName = (jobPlan.getName() != null) ? jobPlan.getName() : "<missing app name>";
     this.userName = appUserName;
@@ -1649,7 +1656,8 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         dag.eventHandler, dag.taskAttemptListener,
         dag.clock, dag.taskHeartbeatHandler,
         !dag.commitAllOutputsOnSuccess, dag.appContext, vertexLocationHint,
-        dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker);
+        dag.vertexGroups, dag.taskSpecificLaunchCmdOption, dag.entityUpdateTracker,
+        dag.dagOnlyConf);
     return v;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 4a0742f..2dfd7f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -830,9 +830,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     TezTaskAttemptID attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
     TaskSpec taskSpec = null;
     if (baseTaskSpec != null) {
-      taskSpec = new TaskSpec(attemptId, baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
+      taskSpec = new TaskSpec(attemptId,
+          baseTaskSpec.getDAGName(), baseTaskSpec.getVertexName(),
           baseTaskSpec.getVertexParallelism(), baseTaskSpec.getProcessorDescriptor(),
-          baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs());
+          baseTaskSpec.getInputs(), baseTaskSpec.getOutputs(), baseTaskSpec.getGroupInputs(),
+          baseTaskSpec.getTaskConf());
     }
     return new TaskAttemptImpl(attemptId, eventHandler,
         taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 0d6bc68..1e875d2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -231,7 +231,12 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   private long cachedCountersTimestamp = 0;
   private Resource taskResource;
 
+  // Merged/combined vertex level config
   private Configuration vertexConf;
+  // Vertex specific configs only ( include the dag specific configs too )
+  // Useful when trying to serialize only the diff from global configs
+  @VisibleForTesting
+  Configuration vertexOnlyConf;
   
   private final boolean isSpeculationEnabled;
 
@@ -875,19 +880,22 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       TaskHeartbeatHandler thh, boolean commitVertexOutputs,
       AppContext appContext, VertexLocationHint vertexLocationHint,
       Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
-      StateChangeNotifier entityStatusTracker) {
+      StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) {
     this.vertexId = vertexId;
     this.vertexPlan = vertexPlan;
     this.vertexName = StringInterner.weakIntern(vertexName);
     this.vertexConf = new Configuration(dagConf);
-    // override dag configuration by using vertex's specified configuration
+    this.vertexOnlyConf = new Configuration(dagOnlyConf);
     if (vertexPlan.hasVertexConf()) {
       ConfigurationProto confProto = vertexPlan.getVertexConf();
       for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) {
         TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX);
         vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue());
+        vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue());
       }
     }
+
+
     this.clock = clock;
     this.appContext = appContext;
     this.commitVertexOutputs = commitVertexOutputs;
@@ -1538,7 +1546,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return TaskSpec.createBaseTaskSpec(getDAG().getName(),
         getName(), getTotalTasks(), getProcessorDescriptor(),
         getInputSpecList(taskIndex), getOutputSpecList(taskIndex), 
-        getGroupInputSpecList(taskIndex));
+        getGroupInputSpecList(taskIndex), vertexOnlyConf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index f0a8625..520d10f 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -2258,6 +2258,8 @@ public class TestVertexImpl {
     LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
     vertices = new HashMap<String, VertexImpl>();
     vertexIdMap = new HashMap<TezVertexID, VertexImpl>();
+    Configuration dagConf = new Configuration(false);
+    dagConf.set("abc", "foobar");
     for (int i = 0; i < vCnt; ++i) {
       VertexPlan vPlan = dagPlan.getVertex(i);
       String vName = vPlan.getName();
@@ -2269,17 +2271,18 @@ public class TestVertexImpl {
         if (customInitializer == null) {
           v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf,
               dispatcher.getEventHandler(), taskAttemptListener,
-              clock, thh, appContext, locationHint, dispatcher, updateTracker);
+              clock, thh, appContext, locationHint, dispatcher, updateTracker, dagConf);
         } else {
           v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf,
               dispatcher.getEventHandler(), taskAttemptListener,
-              clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker);
+              clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker,
+              dagConf);
         }
       } else {
         v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
             dispatcher.getEventHandler(), taskAttemptListener,
             clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption,
-            updateTracker);
+            updateTracker, dagConf);
       }
       vertices.put(vName, v);
       vertexIdMap.put(vertexId, v);
@@ -3120,6 +3123,7 @@ public class TestVertexImpl {
     TaskEventScheduleTask event = (TaskEventScheduleTask) taskEventDispatcher.events.get(0);
     Assert.assertEquals(mockLocation, event.getTaskLocationHint());
     Assert.assertNotNull(event.getBaseTaskSpec());
+    Assert.assertEquals("foobar", event.getBaseTaskSpec().getTaskConf().get("abc"));
   }
   
   @Test(timeout = 5000)
@@ -5417,7 +5421,7 @@ public class TestVertexImpl {
       VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
           dispatcher.getEventHandler(), taskAttemptListener,
           clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
-          updateTracker);
+          updateTracker, new Configuration(false));
       v.setInputVertices(new HashMap());
       vertexIdMap.put(vId, v);
       vertices.put(v.getName(), v);
@@ -5452,11 +5456,12 @@ public class TestVertexImpl {
                                                  VertexLocationHint vertexLocationHint,
                                                  DrainDispatcher dispatcher,
                                                  InputInitializer presetInitializer,
-                                                 StateChangeNotifier updateTracker) {
+                                                 StateChangeNotifier updateTracker,
+                                                 Configuration dagConf) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
           taskAttemptListener, clock, thh, true,
           appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
-          updateTracker);
+          updateTracker, dagConf);
       this.presetInitializer = presetInitializer;
     }
 
@@ -5491,11 +5496,12 @@ public class TestVertexImpl {
                                                       AppContext appContext,
                                                       VertexLocationHint vertexLocationHint,
                                                       DrainDispatcher dispatcher,
-                                                      StateChangeNotifier updateTracker) {
+                                                      StateChangeNotifier updateTracker,
+                                                      Configuration dagConf) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
           taskAttemptListener, clock, thh, true,
           appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
-          updateTracker);
+          updateTracker, dagConf);
       this.dispatcher = dispatcher;
     }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index b4064a0..483c3a5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -39,6 +39,8 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
@@ -271,8 +273,14 @@ public class TestVertexImpl2 {
       doReturn(new Credentials()).when(mockDag).getCredentials();
       doReturn(mockDag).when(mockAppContext).getCurrentDAG();
 
+      ConfigurationProto confProto = ConfigurationProto.newBuilder()
+          .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo").setValue("bar").build())
+          .addConfKeyValues(PlanKeyValuePair.newBuilder().setKey("foo1").setValue("bar2").build())
+          .build();
+
       vertexPlan = DAGProtos.VertexPlan.newBuilder()
           .setName(vertexName)
+          .setVertexConf(confProto)
           .setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder()
               .setJavaOpts(initialJavaOpts)
               .setNumTasks(numTasks)
@@ -286,12 +294,20 @@ public class TestVertexImpl2 {
               .build())
           .setType(DAGProtos.PlanVertexType.NORMAL).build();
 
+      Configuration dagConf = new Configuration(false);
+      dagConf.set("abc1", "xyz1");
+      dagConf.set("foo1", "bar1");
+
       vertex =
           new VertexImpl(TezVertexID.fromString("vertex_1418197758681_0001_1_00"), vertexPlan,
               "testvertex", conf, mock(EventHandler.class), mock(TaskAttemptListener.class),
               mock(Clock.class), mock(TaskHeartbeatHandler.class), false, mockAppContext,
               VertexLocationHint.create(new LinkedList<TaskLocationHint>()), null,
-              new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class));
+              new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class), dagConf);
+
+      assertEquals("xyz1", vertex.vertexOnlyConf.get("abc1"));
+      assertEquals("bar2", vertex.vertexOnlyConf.get("foo1"));
+      assertEquals("bar", vertex.vertexOnlyConf.get("foo"));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 7ad044b..d944e5d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -1455,7 +1455,7 @@ public class TestContainerReuse {
       Collections.singletonList(new InputSpec("vertexName",
           InputDescriptor.create("inputClassName"), 1)),
       Collections.singletonList(new OutputSpec("vertexName",
-          OutputDescriptor.create("outputClassName"), 1)), null), ta, locationHint,
+          OutputDescriptor.create("outputClassName"), 1)), null, null), ta, locationHint,
       priority.getPriority(), containerContext);
     return lr;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
index 45a24cb..bfeaad8 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMROutput.java
@@ -191,7 +191,7 @@ public class TestMROutput {
         dagName, vertexName, -1,
         procDesc,
         inputSpecs,
-        outputSpecs, null);
+        outputSpecs, null, null);
 
     FileSystem fs = FileSystem.getLocal(conf);
     Path workDir =

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 8841882..0d0eace 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -212,7 +212,7 @@ public class MapUtils {
         dagName, vertexName, -1,
         mapProcessorDesc,
         inputSpecs,
-        outputSpecs, null);
+        outputSpecs, null, null);
 
     Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
     serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index fcb42b3..21d2929 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -204,7 +204,7 @@ public class TestReduceProcessor {
         reduceVertexName, -1,
         reduceProcessorDesc,
         Collections.singletonList(reduceInputSpec),
-        Collections.singletonList(reduceOutputSpec), null);
+        Collections.singletonList(reduceOutputSpec), null, null);
 
     Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
     serviceConsumerMetadata.put(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID,

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
index 11ef31d..70e2a18 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java
@@ -967,4 +967,10 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask {
     return this.outputsMap;
   }
 
+  @Private
+  @VisibleForTesting
+  public Configuration getTaskConf() {
+    return tezConf;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
index 4dc57e2..e082bf8 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TaskSpec.java
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.List;
 
 import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.StringInterner;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -42,6 +44,7 @@ public class TaskSpec implements Writable {
   private List<OutputSpec> outputSpecList;
   private List<GroupInputSpec> groupInputSpecList;
   private int vertexParallelism = -1;
+  private Configuration taskConf;
 
   public TaskSpec() {
   }
@@ -49,17 +52,27 @@ public class TaskSpec implements Writable {
   public static TaskSpec createBaseTaskSpec(String dagName, String vertexName,
       int vertexParallelism, ProcessorDescriptor processorDescriptor,
       List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
-      @Nullable List<GroupInputSpec> groupInputSpecList) {
+      @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
     return new TaskSpec(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
-        outputSpecList, groupInputSpecList);
+        outputSpecList, groupInputSpecList, taskConf);
   }
 
   public TaskSpec(
       String dagName, String vertexName,
       int vertexParallelism,
       ProcessorDescriptor processorDescriptor,
-      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, 
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
       @Nullable List<GroupInputSpec> groupInputSpecList) {
+    this(dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+        outputSpecList, groupInputSpecList, null);
+  }
+
+  public TaskSpec(
+      String dagName, String vertexName,
+      int vertexParallelism,
+      ProcessorDescriptor processorDescriptor,
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+      @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
     checkNotNull(dagName, "dagName is null");
     checkNotNull(vertexName, "vertexName is null");
     checkNotNull(processorDescriptor, "processorDescriptor is null");
@@ -73,14 +86,25 @@ public class TaskSpec implements Writable {
     this.outputSpecList = outputSpecList;
     this.groupInputSpecList = groupInputSpecList;
     this.vertexParallelism = vertexParallelism;
+    this.taskConf = taskConf;
+  }
+
+  public TaskSpec(TezTaskAttemptID taskAttemptID,
+                  String dagName, String vertexName,
+                  int vertexParallelism,
+                  ProcessorDescriptor processorDescriptor,
+                  List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+                  @Nullable List<GroupInputSpec> groupInputSpecList) {
+    this(taskAttemptID, dagName, vertexName, vertexParallelism, processorDescriptor, inputSpecList,
+        outputSpecList, groupInputSpecList, null);
   }
 
   public TaskSpec(TezTaskAttemptID taskAttemptID,
       String dagName, String vertexName,
       int vertexParallelism,
       ProcessorDescriptor processorDescriptor,
-      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList, 
-      @Nullable List<GroupInputSpec> groupInputSpecList) {
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList,
+      @Nullable List<GroupInputSpec> groupInputSpecList, Configuration taskConf) {
     checkNotNull(taskAttemptID, "taskAttemptID is null");
     checkNotNull(dagName, "dagName is null");
     checkNotNull(vertexName, "vertexName is null");
@@ -95,6 +119,7 @@ public class TaskSpec implements Writable {
     this.outputSpecList = outputSpecList;
     this.groupInputSpecList = groupInputSpecList;
     this.vertexParallelism = vertexParallelism;
+    this.taskConf = taskConf;
   }
 
   public String getDAGName() {
@@ -129,6 +154,10 @@ public class TaskSpec implements Writable {
     return groupInputSpecList;
   }
 
+  public Configuration getTaskConf() {
+    return taskConf;
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     taskAttemptId.write(out);
@@ -153,6 +182,12 @@ public class TaskSpec implements Writable {
     } else {
       out.writeBoolean(false);
     }
+    if (taskConf != null) {
+      out.writeBoolean(true);
+      taskConf.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
   }
 
   @Override
@@ -188,6 +223,11 @@ public class TaskSpec implements Writable {
         groupInputSpecList.add(group);
       }
     }
+    boolean hasVertexConf = in.readBoolean();
+    if (hasVertexConf) {
+      taskConf = new Configuration(false);
+      taskConf.readFields(in);
+    }
   }
 
   @Override
@@ -216,6 +256,9 @@ public class TaskSpec implements Writable {
       }
       sb.append("]");
     }
+    if (taskConf != null) {
+      sb.append(", taskConfEntryCount=" + taskConf.size());
+    }
     return sb.toString();
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
index de83889..c4cae15 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner.java
@@ -23,7 +23,9 @@ import java.lang.reflect.UndeclaredThrowableException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -44,6 +46,7 @@ import org.apache.tez.runtime.api.impl.TezUmbilical;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Multimap;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -53,11 +56,13 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
   private static final Logger LOG = LoggerFactory.getLogger(TezTaskRunner.class);
 
   private final Configuration tezConf;
-  private final LogicalIOProcessorRuntimeTask task;
+  @VisibleForTesting
+  final LogicalIOProcessorRuntimeTask task;
   private final UserGroupInformation ugi;
 
   private final TaskReporter taskReporter;
   private final ListeningExecutorService executor;
+  final Configuration taskConf;
   private volatile ListenableFuture<Void> taskFuture;
   private volatile Thread waitingThread;
   private volatile Throwable firstException;
@@ -78,7 +83,15 @@ public class TezTaskRunner implements TezUmbilical, ErrorReporter {
     this.ugi = ugi;
     this.taskReporter = taskReporter;
     this.executor = executor;
-    task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, tezConf, localDirs, this,
+    this.taskConf = new Configuration(tezConf);
+    if (taskSpec.getTaskConf() != null) {
+      Iterator<Entry<String, String>> iter = taskSpec.getTaskConf().iterator();
+      while (iter.hasNext()) {
+        Entry<String, String> entry = iter.next();
+        taskConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    task = new LogicalIOProcessorRuntimeTask(taskSpec, appAttemptNumber, taskConf, localDirs, this,
         serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, objectRegistry, pid,
         executionContext, memAvailable);
     taskRunning = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
index 520ad69..08e909e 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java
@@ -192,7 +192,7 @@ public class TestLogicalIOProcessorRuntimeTask {
     ProcessorDescriptor processorDesc = createProcessorDescriptor();
     TaskSpec taskSpec = new TaskSpec(taskAttemptID,
         dagName, vertexName, parallelism, processorDesc,
-        createInputSpecList(), createOutputSpecList(), null);
+        createInputSpecList(), createOutputSpecList(), null, null);
     return taskSpec;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
new file mode 100644
index 0000000..f3cb49b
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/api/impl/TestTaskSpec.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.api.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTaskSpec {
+
+  @Test (timeout = 5000)
+  public void testSerDe() throws IOException {
+    ByteBuffer payload = null;
+    ProcessorDescriptor procDesc = ProcessorDescriptor.create("proc").setUserPayload(
+        UserPayload.create(payload)).setHistoryText("historyText");
+
+    List<InputSpec> inputSpecs = new ArrayList<InputSpec>();
+    InputSpec inputSpec = new InputSpec("src1", InputDescriptor.create("inputClass"),10);
+    inputSpecs.add(inputSpec);
+    List<OutputSpec> outputSpecs = new ArrayList<OutputSpec>();
+    OutputSpec outputSpec = new OutputSpec("dest1", OutputDescriptor.create("outputClass"), 999);
+    outputSpecs.add(outputSpec);
+    List<GroupInputSpec> groupInputSpecs = null;
+
+    Configuration taskConf = new Configuration(false);
+    taskConf.set("foo", "bar");
+
+    TezTaskAttemptID taId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(
+        TezVertexID.getInstance(TezDAGID.getInstance("1234", 1, 1), 1), 1), 1);
+    TaskSpec taskSpec = new TaskSpec(taId, "dagName", "vName", -1, procDesc, inputSpecs, outputSpecs,
+        groupInputSpecs, taskConf);
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(bos);
+    taskSpec.write(out);
+
+    TaskSpec deSerTaskSpec = new TaskSpec();
+    ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+    DataInput in = new DataInputStream(bis);
+    deSerTaskSpec.readFields(in);
+
+    Assert.assertEquals(taskSpec.getDAGName(), deSerTaskSpec.getDAGName());
+    Assert.assertEquals(taskSpec.getVertexName(), deSerTaskSpec.getVertexName());
+    Assert.assertEquals(taskSpec.getVertexParallelism(), deSerTaskSpec.getVertexParallelism());
+    Assert.assertEquals(taskSpec.getInputs().size(), deSerTaskSpec.getInputs().size());
+    Assert.assertEquals(taskSpec.getOutputs().size(), deSerTaskSpec.getOutputs().size());
+    Assert.assertNull(deSerTaskSpec.getGroupInputs());
+    Assert.assertEquals(taskSpec.getInputs().get(0).getSourceVertexName(),
+        deSerTaskSpec.getInputs().get(0).getSourceVertexName());
+    Assert.assertEquals(taskSpec.getOutputs().get(0).getDestinationVertexName(),
+        deSerTaskSpec.getOutputs().get(0).getDestinationVertexName());
+
+    Assert.assertEquals(taskConf.get("foo"), deSerTaskSpec.getTaskConf().get("foo"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8eb707e4/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java
new file mode 100644
index 0000000..c943e02
--- /dev/null
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestTezTaskRunner.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.task;
+
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.runtime.api.impl.InputSpec;
+import org.apache.tez.runtime.api.impl.OutputSpec;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTezTaskRunner {
+
+  @Test (timeout = 5000)
+  public void testTaskConfUsage() throws Exception {
+    Configuration conf = new Configuration(false);
+    conf.set("global", "global1");
+    conf.set("global_override", "global1");
+    String[] localDirs = null;
+    Configuration taskConf = new Configuration(false);
+    conf.set("global_override", "task1");
+    conf.set("task", "task1");
+
+    List<InputSpec> inputSpecList = new ArrayList<InputSpec>();
+    List<OutputSpec> outputSpecList = new ArrayList<OutputSpec>();
+    TaskSpec taskSpec = new TaskSpec("dagName", "vertexName", 1, mock(ProcessorDescriptor.class),
+        inputSpecList, outputSpecList, null, taskConf);
+    TezTaskRunner taskRunner = new TezTaskRunner(conf, mock(UserGroupInformation.class),
+        localDirs, taskSpec, 1, null, null, null, mock(TaskReporter.class), null, null, "pid",
+        null, 1000);
+
+    Assert.assertEquals("global1", taskRunner.task.getTaskConf().get("global"));
+    Assert.assertEquals("task1", taskRunner.task.getTaskConf().get("global_override"));
+    Assert.assertEquals("task1", taskRunner.task.getTaskConf().get("task"));
+  }
+
+
+}