You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/23 19:45:24 UTC

[10/20] git commit: Rename *.new* packages back to what they should be, remove dead code from the old packages - mapreduce module - tez-engine module (part of TEZ-398). (sseth)

 Rename *.new* packages back to what they should be, remove dead code
 from the old packages - mapreduce module - tez-engine module (part of
 TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b4950f98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b4950f98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b4950f98

Branch: refs/heads/TEZ-398
Commit: b4950f98a7a0c62d4c6d53ab75bfb857e8f6b551
Parents: 3d60945
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Sep 23 10:44:14 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Sep 23 10:44:14 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   |   24 +-
 .../dag/app/TaskAttemptListenerImpTezDag.java   |    6 +-
 .../java/org/apache/tez/dag/app/dag/Task.java   |    2 +-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |    4 +-
 .../dag/event/TaskAttemptEventStatusUpdate.java |    2 +-
 .../dag/app/dag/event/TaskEventAddTezEvent.java |    2 +-
 .../app/dag/event/VertexEventRouteEvent.java    |    2 +-
 .../org/apache/tez/dag/app/dag/impl/Edge.java   |   10 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |    4 +-
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |    2 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |   12 +-
 .../app/rm/AMSchedulerEventTALaunchRequest.java |    2 +-
 .../rm/container/AMContainerEventAssignTA.java  |    2 +-
 .../dag/app/rm/container/AMContainerImpl.java   |    2 +-
 .../dag/app/rm/container/AMContainerTask.java   |    2 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |    2 +-
 .../tez/dag/app/rm/TestContainerReuse.java      |    7 +-
 .../org/apache/tez/common/ContainerTask.java    |    2 +-
 .../apache/tez/common/RunningTaskContext.java   |   70 -
 .../apache/tez/common/TezEngineTaskContext.java |  141 --
 .../org/apache/tez/common/TezTaskReporter.java  |  116 --
 .../tez/common/TezTaskUmbilicalProtocol.java    |   14 +-
 .../org/apache/tez/engine/api/KVReader.java     |   81 +
 .../org/apache/tez/engine/api/KVWriter.java     |   40 +
 .../api/events/TaskAttemptCompletedEvent.java   |   28 +
 .../api/events/TaskAttemptFailedEvent.java      |   35 +
 .../api/events/TaskStatusUpdateEvent.java       |   70 +
 .../tez/engine/api/impl/EventMetaData.java      |  152 ++
 .../apache/tez/engine/api/impl/EventType.java   |   29 +
 .../apache/tez/engine/api/impl/InputSpec.java   |   88 +
 .../apache/tez/engine/api/impl/OutputSpec.java  |   87 +
 .../apache/tez/engine/api/impl/TaskSpec.java    |  146 ++
 .../apache/tez/engine/api/impl/TezEvent.java    |  248 +++
 .../engine/api/impl/TezHeartbeatRequest.java    |  137 ++
 .../engine/api/impl/TezHeartbeatResponse.java   |  105 ++
 .../engine/api/impl/TezInputContextImpl.java    |   84 +
 .../engine/api/impl/TezOutputContextImpl.java   |   85 +
 .../api/impl/TezProcessorContextImpl.java       |   86 +
 .../tez/engine/api/impl/TezTaskContextImpl.java |  145 ++
 .../tez/engine/api/impl/TezUmbilical.java       |   36 +
 .../broadcast/input/BroadcastInputManager.java  |    2 +-
 .../broadcast/input/BroadcastKVReader.java      |    2 +-
 .../broadcast/output/FileBasedKVWriter.java     |    4 +-
 .../tez/engine/common/TezEngineUtils.java       |    4 +-
 .../tez/engine/common/combine/CombineInput.java |  176 --
 .../engine/common/combine/CombineOutput.java    |   55 -
 .../common/localshuffle/LocalShuffle.java       |    4 +-
 .../engine/common/shuffle/impl/MapOutput.java   |    2 +-
 .../common/shuffle/impl/MergeManager.java       |    2 +-
 .../tez/engine/common/sort/SortingOutput.java   |   32 -
 .../engine/common/sort/impl/ExternalSorter.java |    2 +-
 .../common/task/impl/CombineValuesIterator.java |   51 -
 .../newoutput/TezLocalTaskOutputFiles.java      |  249 ---
 .../task/local/newoutput/TezTaskOutput.java     |  165 --
 .../local/newoutput/TezTaskOutputFiles.java     |  246 ---
 .../local/output/TezLocalTaskOutputFiles.java   |   58 +-
 .../common/task/local/output/TezTaskOutput.java |   45 +-
 .../task/local/output/TezTaskOutputFiles.java   |   65 +-
 .../engine/lib/input/ShuffledMergedInput.java   |    2 +-
 .../engine/lib/oldinput/LocalMergedInput.java   |   67 -
 .../lib/oldinput/OldShuffledMergedInput.java    |   74 -
 .../lib/oldoutput/OldInMemorySortedOutput.java  |   58 -
 .../oldoutput/OldLocalOnFileSorterOutput.java   |   38 -
 .../lib/oldoutput/OldOnFileSortedOutput.java    |   62 -
 .../engine/lib/output/InMemorySortedOutput.java |    2 +-
 .../lib/output/LocalOnFileSorterOutput.java     |    2 +-
 .../engine/lib/output/OnFileSortedOutput.java   |    2 +-
 .../lib/output/OnFileUnorderedKVOutput.java     |    2 +-
 .../org/apache/tez/engine/newapi/KVReader.java  |   79 -
 .../org/apache/tez/engine/newapi/KVWriter.java  |   38 -
 .../events/TaskAttemptCompletedEvent.java       |   28 -
 .../newapi/events/TaskAttemptFailedEvent.java   |   35 -
 .../newapi/events/TaskStatusUpdateEvent.java    |   70 -
 .../tez/engine/newapi/impl/EventMetaData.java   |  152 --
 .../tez/engine/newapi/impl/EventType.java       |   29 -
 .../tez/engine/newapi/impl/InputSpec.java       |   88 -
 .../tez/engine/newapi/impl/OutputSpec.java      |   87 -
 .../apache/tez/engine/newapi/impl/TaskSpec.java |  146 --
 .../apache/tez/engine/newapi/impl/TezEvent.java |  248 ---
 .../engine/newapi/impl/TezHeartbeatRequest.java |  137 --
 .../newapi/impl/TezHeartbeatResponse.java       |  105 --
 .../engine/newapi/impl/TezInputContextImpl.java |   84 -
 .../newapi/impl/TezOutputContextImpl.java       |   85 -
 .../newapi/impl/TezProcessorContextImpl.java    |   86 -
 .../engine/newapi/impl/TezTaskContextImpl.java  |  145 --
 .../tez/engine/newapi/impl/TezUmbilical.java    |   36 -
 .../LogicalIOProcessorRuntimeTask.java          |   20 +-
 .../tez/engine/newruntime/RuntimeTask.java      |    6 +-
 .../apache/tez/engine/runtime/RuntimeUtils.java |  164 --
 .../engine/shuffle/common/DiskFetchedInput.java |    2 +-
 .../org/apache/tez/engine/task/RuntimeTask.java |   92 -
 .../mapred/LocalClientProtocolProviderTez.java  |  108 +-
 .../hadoop/mapred/LocalJobRunnerMetricsTez.java |  196 +-
 .../apache/hadoop/mapred/LocalJobRunnerTez.java | 1753 +++++++++---------
 .../apache/tez/mapreduce/input/SimpleInput.java |    2 +-
 .../tez/mapreduce/output/SimpleOutput.java      |    2 +-
 .../mapreduce/processor/map/MapProcessor.java   |    4 +-
 .../processor/reduce/ReduceProcessor.java       |    4 +-
 .../tez/mapreduce/TestUmbilicalProtocol.java    |    4 +-
 .../tez/mapreduce/processor/MapUtils.java       |   43 +-
 .../processor/map/TestMapProcessor.java         |   45 +-
 .../processor/reduce/TestReduceProcessor.java   |   76 +-
 102 files changed, 2984 insertions(+), 4800 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 31898a3..f32fa6b 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -67,23 +67,23 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.engine.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.impl.EventMetaData;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
+import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.TokenCache;
-import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
-import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
-import org.apache.tez.engine.newapi.impl.TezUmbilical;
 import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.input.SimpleInputLegacy;
 import org.apache.tez.mapreduce.output.SimpleOutput;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 2be9c5f..36486c9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -52,10 +52,10 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
-import org.apache.tez.engine.newapi.impl.TezEvent;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.records.OutputContext;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
index 0947a41..088a195 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Task.java
@@ -26,7 +26,7 @@ import org.apache.tez.dag.api.oldrecords.TaskReport;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezEvent;
 
 /**
  * Read only view of Task.

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 85240e7..42ff8de 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -32,8 +32,8 @@ import org.apache.tez.dag.app.dag.impl.Edge;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
index 3a8c489..0b8db76 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStatusUpdate.java
@@ -25,7 +25,7 @@ import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
 
 public class TaskAttemptEventStatusUpdate extends TaskAttemptEvent {
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
index 51f6d53..4154bd0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventAddTezEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tez.dag.app.dag.event;
 
 import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezEvent;
 
 public class TaskEventAddTezEvent extends TaskEvent {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
index c851ae0..37478cb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/VertexEventRouteEvent.java
@@ -21,7 +21,7 @@ package org.apache.tez.dag.app.dag.event;
 import java.util.List;
 
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezEvent;
 
 public class VertexEventRouteEvent extends VertexEvent {
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index d565978..3605857 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -31,14 +31,14 @@ import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.api.impl.EventMetaData;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
 import org.apache.tez.engine.newapi.events.DataMovementEvent;
 import org.apache.tez.engine.newapi.events.InputFailedEvent;
 import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-import org.apache.tez.engine.newapi.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
 
 public class Edge {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1ae9dcd..f2b2776 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -89,8 +89,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.impl.TaskSpec;
 
 import com.google.common.annotations.VisibleForTesting;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 92a1859..13fa915 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -75,7 +75,7 @@ import org.apache.tez.dag.history.events.TaskStartedEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.api.impl.TezEvent;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 0bcba4f..1ec1225 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -106,14 +106,14 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.impl.EventMetaData;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TezEvent;
 import org.apache.tez.engine.newapi.events.DataMovementEvent;
 import org.apache.tez.engine.newapi.events.InputFailedEvent;
-import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
-import org.apache.tez.engine.newapi.events.TaskStatusUpdateEvent;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index 1c30b0b..14edd96 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
 
 public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
index dd178fb..76e80f5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
@@ -19,7 +19,7 @@ package org.apache.tez.dag.app.rm.container;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
 
 public class AMContainerEventAssignTA extends AMContainerEvent {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index eccf92a..94dd580 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -54,7 +54,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 //import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
 
 @SuppressWarnings("rawtypes")
 public class AMContainerImpl implements AMContainer {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
index be1c08e..c0ef524 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
@@ -18,7 +18,7 @@
 
 package org.apache.tez.dag.app.rm.container;
 
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
 
 public class AMContainerTask {
   private final boolean shouldDie;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index b5e283b..676e747 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -82,7 +82,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 408f88a..3a6e008 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -57,9 +56,9 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.impl.InputSpec;
-import org.apache.tez.engine.newapi.impl.OutputSpec;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.InputSpec;
+import org.apache.tez.engine.api.impl.OutputSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
index 3c18d9f..e90f7fa 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
@@ -22,7 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.api.impl.TaskSpec;
 
 public class ContainerTask implements Writable {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
deleted file mode 100644
index aac4095..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/RunningTaskContext.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.common;
-
-import java.io.IOException;
-
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.util.Progress;
-import org.apache.tez.engine.api.Partitioner;
-import org.apache.tez.engine.api.Processor;
-
-public class RunningTaskContext {
-  
-  protected SecretKey jobTokenSecret;
-  protected TezTaskReporter reporter;
-  protected Partitioner partitioner;
-  protected Processor combineProcessor;
-  protected TezTaskStatus status;
-  protected Progress progress = new Progress();
-
-  public Progress getProgress() {
-    return progress;
-  }
-
-  public void setJobTokenSecret(SecretKey jobTokenSecret) {
-    this.jobTokenSecret = jobTokenSecret;
-  }
-
-  public TezTaskStatus getStatus() {
-    return status;
-  }
-
-  public TezTaskReporter getTaskReporter() {
-    return reporter;
-  }
-
-  // TODO Doesn't belong here.
-  public Processor getCombineProcessor() {
-    return combineProcessor;
-  }
-
-  // TODO Doesn't belong here.
-  public Partitioner getPartitioner() {
-    return partitioner;
-  }
-
-  // TODO Doesn't belong here.
-  public SecretKey getJobTokenSecret() {
-    return jobTokenSecret;
-  }
-  
-  public void statusUpdate() throws IOException, InterruptedException {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java b/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
deleted file mode 100644
index c012928..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/TezEngineTaskContext.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class TezEngineTaskContext extends TezTaskContext {
-
-  // These two could be replaced by a TezConfiguration / DagSpec.
-  private List<InputSpec> inputSpecList;
-  private List<OutputSpec> outputSpecList;
-  private ProcessorDescriptor processorDescriptor;
-  
-  public TezEngineTaskContext() {
-    super();
-  }
-
-  public TezEngineTaskContext(TezTaskAttemptID taskAttemptID, String user,
-      String jobName, String vertexName, ProcessorDescriptor processorDescriptor,
-      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
-    super(taskAttemptID, user, jobName, vertexName);
-    this.inputSpecList = inputSpecList;
-    this.outputSpecList = outputSpecList;
-    if (this.inputSpecList == null) {
-      inputSpecList = new ArrayList<InputSpec>(0);
-    }
-    if (this.outputSpecList == null) {
-      outputSpecList = new ArrayList<OutputSpec>(0);
-    }
-    this.inputSpecList = inputSpecList;
-    this.outputSpecList = outputSpecList;
-    this.processorDescriptor = processorDescriptor;
-  }
-
-  public String getRuntimeName() {
-    // FIXME. Add this to the DAG configuration, and fetch from there.
-    return "org.apache.tez.mapreduce.task.MRRuntimeTask";
-  }
-
-  public String getProcessorName() {
-    return processorDescriptor.getClassName();
-  }
-  
-  public byte[] getProcessorUserPayload() {
-    return processorDescriptor.getUserPayload();
-  }
-  
-  public List<InputSpec> getInputSpecList() {
-    return this.inputSpecList;
-  }
-  
-  public List<OutputSpec> getOutputSpecList() {
-    return this.outputSpecList;
-  }
-  
-  @Override
-  public void write(DataOutput out) throws IOException {
-    super.write(out);
-    byte[] procDesc = 
-        DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
-    out.writeInt(procDesc.length);
-    out.write(procDesc);
-    out.writeInt(inputSpecList.size());
-    for (InputSpec inputSpec : inputSpecList) {
-      inputSpec.write(out);
-    }
-    out.writeInt(outputSpecList.size());
-    for (OutputSpec outputSpec : outputSpecList) {
-      outputSpec.write(out);
-    }
-  }
-  
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    super.readFields(in);
-    int procDescLength = in.readInt();
-    // TODO at least 3 buffer copies here. Need to convert this to full PB
-    // TEZ-305
-    byte[] procDescBytes = new byte[procDescLength];
-    in.readFully(procDescBytes);
-    processorDescriptor = DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
-        TezEntityDescriptorProto.parseFrom(procDescBytes)); 
-    int numInputSpecs = in.readInt();
-    inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
-    for (int i = 0; i < numInputSpecs; i++) {
-      InputSpec inputSpec = new InputSpec();
-      inputSpec.readFields(in);
-      inputSpecList.add(inputSpec);
-    }
-    int numOutputSpecs = in.readInt();
-    outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
-    for (int i = 0; i < numOutputSpecs; i++) {
-      OutputSpec outputSpec = new OutputSpec();
-      outputSpec.readFields(in);
-      outputSpecList.add(outputSpec);
-    }
-  }
-
-  @Override
-  public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("processorName=" + getProcessorName()
-        + ", inputSpecListSize=" + inputSpecList.size()
-        + ", outputSpecListSize=" + outputSpecList.size());
-    sb.append(", inputSpecList=[");
-    for (InputSpec i : inputSpecList) {
-      sb.append("{" + i.toString() + "}, ");
-    }
-    sb.append("], outputSpecList=[");
-    for (OutputSpec i : outputSpecList) {
-      sb.append("{" + i.toString() + "}, ");
-    }
-    sb.append("]");
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/TezTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskReporter.java b/tez-engine/src/main/java/org/apache/tez/common/TezTaskReporter.java
deleted file mode 100644
index 1931e31..0000000
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskReporter.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.common;
-
-import java.io.IOException;
-
-import org.apache.hadoop.ipc.ProtocolSignature;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
-
-public interface TezTaskReporter extends Progressable, Master {
-
-  public void setStatus(String status);
-
-  public float getProgress();
-
-  public void setProgress(float progress);
-  
-  public void progress();
-
-  public TezCounter getCounter(String group, String name);
-
-  public TezCounter getCounter(Enum<?> name);
-
-  public void incrCounter(String group, String counter, long amount);
-
-  public void incrCounter(Enum<?> key, long amount);
-
-  public void reportFatalError(TezTaskAttemptID taskAttemptId, 
-      Throwable exception, String logMsg);
-
-  public final TezTaskReporter NULL = new TezTaskReporter() {
-
-    @Override
-    public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
-        int fromEventIdx, int maxEventsToFetch,
-        TezTaskAttemptID reduce) {
-      return null;
-    }
-    
-    @Override
-    public void setStatus(String status) {
-    }
-    
-    @Override
-    public void setProgress(float progress) {
-    }
-    
-    @Override
-    public void progress() {
-    }
-    
-    @Override
-    public void incrCounter(Enum<?> key, long amount) {
-    }
-    
-    @Override
-    public void incrCounter(String group, String counter, long amount) {
-    }
-    
-    @Override
-    public float getProgress() {
-      return 0.0f;
-    }
-    
-    @Override
-    public TezCounter getCounter(Enum<?> name) {
-      return null;
-    }
-    
-    @Override
-    public TezCounter getCounter(String group, String name) {
-      return null;
-    }
-
-    @Override
-    public void reportFatalError(TezTaskAttemptID taskAttemptId,
-        Throwable exception, String logMsg) {
-      // TODO Auto-generated method stub
-      
-    }
-
-    @Override
-    public long getProtocolVersion(String protocol, long clientVersion)
-        throws IOException {
-      // TODO TEZAM3
-      return 0;
-    }
-
-    @Override
-    public ProtocolSignature getProtocolSignature(String protocol,
-        long clientVersion, int clientMethodsHash) throws IOException {
-      // TODO TEZAM3
-      return null;
-    }
-  };
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
index 28991a8..7d81b4c 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -22,13 +22,15 @@ import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
-import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
+import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
 
 /** Protocol that task child process uses to contact its parent process.  The
  * parent is a daemon which which polls the central master for a new map or
@@ -36,7 +38,8 @@ import org.apache.tez.engine.records.OutputContext;
  * and parent is via this protocol. */
 @InterfaceAudience.Private
 @InterfaceStability.Stable
-public interface TezTaskUmbilicalProtocol extends Master {
+@ProtocolInfo(protocolName = "TezTaskUmbilicalProtocol", protocolVersion = 1)
+public interface TezTaskUmbilicalProtocol extends VersionedProtocol {
 
   public static final long versionID = 19L;
 
@@ -68,4 +71,7 @@ public interface TezTaskUmbilicalProtocol extends Master {
   public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
       throws IOException, TezException;
 
+  public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+      int fromEventIdx, int maxEventsToFetch,
+      TezTaskAttemptID taskAttemptId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java
new file mode 100644
index 0000000..150b598
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/KVReader.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+import org.apache.tez.engine.newapi.Reader;
+
+/**
+ * A key/value(s) pair based {@link Reader}.
+ * 
+ * Example usage
+ * <code>
+ * while (kvReader.next()) {
+ *   KVRecord kvRecord = getCurrentKV();
+ *   Object key =  kvRecord.getKey();
+ *   Iterable values = kvRecord.getValues();
+ * </code>
+ *
+ */
+public interface KVReader extends Reader {
+
+  /**
+   * Moves to the next key/values(s) pair
+   * 
+   * @return true if another key/value(s) pair exists, false if there are no more.
+   * @throws IOException
+   *           if an error occurs
+   */
+  public boolean next() throws IOException;
+
+  /**
+   * Return the current key/value(s) pair. Use moveToNext() to advance.
+   * @return
+   * @throws IOException
+   */
+  public KVRecord getCurrentKV() throws IOException;
+  
+  // TODO NEWTEZ Move this to getCurrentKey and getCurrentValue independently. Otherwise usage pattern seems to be getCurrentKV.getKey, getCurrentKV.getValue
+  
+  // TODO NEWTEZ KVRecord which does not need to return a list!
+  // TODO NEWTEZ Parameterize this
+  /**
+   * Represents a key and an associated set of values
+   *
+   */
+  public static class KVRecord {
+
+    private Object key;
+    private Iterable<Object> values;
+
+    public KVRecord(Object key, Iterable<Object> values) {
+      this.key = key;
+      this.values = values;
+    }
+
+    public Object getKey() {
+      return this.key;
+    }
+
+    public Iterable<Object> getValues() {
+      return this.values;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java
new file mode 100644
index 0000000..079d488
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/KVWriter.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api;
+
+import java.io.IOException;
+
+import org.apache.tez.engine.newapi.Writer;
+
+/**
+ * A key/value(s) pair based {@link Writer}
+ */
+public interface KVWriter extends Writer {
+  /**
+   * Writes a key/value pair.
+   * 
+   * @param key
+   *          the key to write
+   * @param value
+   *          the value to write
+   * @throws IOException
+   *           if an error occurs
+   */
+  public void write(Object key, Object value) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java
new file mode 100644
index 0000000..3a90f56
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptCompletedEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskAttemptCompletedEvent extends Event {
+
+  public TaskAttemptCompletedEvent() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java
new file mode 100644
index 0000000..bd0bc04
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskAttemptFailedEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskAttemptFailedEvent extends Event {
+
+  private final String diagnostics;
+
+  public TaskAttemptFailedEvent(String diagnostics) {
+    this.diagnostics = diagnostics;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java
new file mode 100644
index 0000000..c0d77da
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/events/TaskStatusUpdateEvent.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.events;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskStatusUpdateEvent extends Event implements Writable {
+
+  private TezCounters tezCounters;
+  private float progress;
+
+  public TaskStatusUpdateEvent() {
+  }
+
+  public TaskStatusUpdateEvent(TezCounters tezCounters, float progress) {
+    this.tezCounters = tezCounters;
+    this.progress = progress;
+  }
+
+  public TezCounters getCounters() {
+    return tezCounters;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(progress);
+    if (tezCounters != null) {
+      out.writeBoolean(true);
+      tezCounters.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    progress = in.readFloat();
+    if (in.readBoolean()) {
+      tezCounters = new TezCounters();
+      tezCounters.readFields(in);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java
new file mode 100644
index 0000000..64df7bb
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventMetaData.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * Class that encapsulates all the information to identify the unique
+ * object that either generated an Event or is the recipient of an Event.
+ */
+public class EventMetaData implements Writable {
+
+  public static enum EventProducerConsumerType {
+    INPUT,
+    PROCESSOR,
+    OUTPUT,
+    SYSTEM
+  }
+
+  /**
+   * Producer Type ( one of Input/Output/Processor ) that generated the Event
+   * or Consumer Type that will consume the Event.
+   */
+  private EventProducerConsumerType producerConsumerType;
+
+  /**
+   * Name of the vertex where the event was generated.
+   */
+  private String taskVertexName;
+
+  /**
+   * Name of the vertex to which the Input or Output is connected to.
+   */
+  private String edgeVertexName;
+
+  /**
+   * i'th physical input/output that this event maps to.
+   */
+  private int index;
+
+  /**
+   * Task Attempt ID
+   */
+  private TezTaskAttemptID taskAttemptID;
+
+  public EventMetaData() {
+  }
+
+  public EventMetaData(EventProducerConsumerType generator,
+      String taskVertexName, String edgeVertexName,
+      TezTaskAttemptID taskAttemptID) {
+    this.producerConsumerType = generator;
+    this.taskVertexName = taskVertexName;
+    this.edgeVertexName = edgeVertexName;
+    this.taskAttemptID = taskAttemptID;
+  }
+
+  public EventProducerConsumerType getEventGenerator() {
+    return producerConsumerType;
+  }
+
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskAttemptID;
+  }
+
+  public String getTaskVertexName() {
+    return taskVertexName;
+  }
+
+  public String getEdgeVertexName() {
+    return edgeVertexName;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(producerConsumerType.ordinal());
+    if (taskVertexName != null) {
+      out.writeBoolean(true);
+      out.writeUTF(taskVertexName);
+    } else {
+      out.writeBoolean(false);
+    }
+    if (edgeVertexName != null) {
+      out.writeBoolean(true);
+      out.writeUTF(edgeVertexName);
+    } else {
+      out.writeBoolean(false);
+    }
+    if(taskAttemptID != null) {
+      out.writeBoolean(true);
+      taskAttemptID.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+    
+    out.writeInt(index);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    producerConsumerType = EventProducerConsumerType.values()[in.readInt()];
+    if (in.readBoolean()) {
+      taskVertexName = in.readUTF();
+    }
+    if (in.readBoolean()) {
+      edgeVertexName = in.readUTF();
+    }
+    if (in.readBoolean()) {
+      taskAttemptID = new TezTaskAttemptID();
+      taskAttemptID.readFields(in);
+    }
+    index = in.readInt();
+  }
+
+  public int getIndex() {
+    return index;
+  }
+
+  public void setIndex(int index) {
+    this.index = index;
+  }
+
+  @Override
+  public String toString() {
+    return "{ producerConsumerType=" + producerConsumerType
+        + ", taskVertexName=" + taskVertexName
+        + ", edgeVertexName=" + edgeVertexName
+        + ", taskAttemptId=" + taskAttemptID
+        + ", index=" + index + " }";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java
new file mode 100644
index 0000000..52fc10d
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/EventType.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+public enum EventType {
+  TASK_ATTEMPT_COMPLETED_EVENT,
+  TASK_ATTEMPT_FAILED_EVENT,
+  DATA_MOVEMENT_EVENT,
+  INPUT_READ_ERROR_EVENT,
+  INPUT_FAILED_EVENT,
+  INTPUT_INFORMATION_EVENT,
+  TASK_STATUS_UPDATE_EVENT
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java
new file mode 100644
index 0000000..a9ef333
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/InputSpec.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class InputSpec implements Writable {
+
+  private String sourceVertexName;
+  private InputDescriptor inputDescriptor;
+  private int physicalEdgeCount;
+
+  public InputSpec() {
+  }
+
+  public InputSpec(String sourceVertexName, InputDescriptor inputDescriptor,
+      int physicalEdgeCount) {
+    this.sourceVertexName = sourceVertexName;
+    this.inputDescriptor = inputDescriptor;
+    this.physicalEdgeCount = physicalEdgeCount;
+  }
+
+  public String getSourceVertexName() {
+    return sourceVertexName;
+  }
+
+  public InputDescriptor getInputDescriptor() {
+    return inputDescriptor;
+  }
+
+  public int getPhysicalEdgeCount() {
+    return physicalEdgeCount;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // TODONEWTEZ convert to PB
+    out.writeUTF(sourceVertexName);
+    out.writeInt(physicalEdgeCount);
+    byte[] inputDescBytes =
+        DagTypeConverters.convertToDAGPlan(inputDescriptor).toByteArray();
+    out.writeInt(inputDescBytes.length);
+    out.write(inputDescBytes);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    sourceVertexName = in.readUTF();
+    physicalEdgeCount = in.readInt();
+    int inputDescLen = in.readInt();
+    byte[] inputDescBytes = new byte[inputDescLen];
+    in.readFully(inputDescBytes);
+    inputDescriptor =
+        DagTypeConverters.convertInputDescriptorFromDAGPlan(
+            TezEntityDescriptorProto.parseFrom(inputDescBytes));
+  }
+
+  public String toString() {
+    return "{ sourceVertexName=" + sourceVertexName
+        + ", physicalEdgeCount" + physicalEdgeCount
+        + ", inputClassName=" + inputDescriptor.getClassName()
+        + " }";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java
new file mode 100644
index 0000000..3a1d5d8
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/OutputSpec.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+
+public class OutputSpec implements Writable {
+
+  private String destinationVertexName;
+  private OutputDescriptor outputDescriptor;
+  private int physicalEdgeCount;
+
+  public OutputSpec() {
+  }
+
+  public OutputSpec(String destinationVertexName,
+      OutputDescriptor outputDescriptor, int physicalEdgeCount) {
+    this.destinationVertexName = destinationVertexName;
+    this.outputDescriptor = outputDescriptor;
+    this.physicalEdgeCount = physicalEdgeCount;
+  }
+
+  public String getDestinationVertexName() {
+    return destinationVertexName;
+  }
+
+  public OutputDescriptor getOutputDescriptor() {
+    return outputDescriptor;
+  }
+
+  public int getPhysicalEdgeCount() {
+    return physicalEdgeCount;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // TODONEWTEZ convert to PB
+    out.writeUTF(destinationVertexName);
+    out.writeInt(physicalEdgeCount);
+    byte[] inputDescBytes =
+        DagTypeConverters.convertToDAGPlan(outputDescriptor).toByteArray();
+    out.writeInt(inputDescBytes.length);
+    out.write(inputDescBytes);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    destinationVertexName = in.readUTF();
+    physicalEdgeCount = in.readInt();
+    int inputDescLen = in.readInt();
+    byte[] inputDescBytes = new byte[inputDescLen];
+    in.readFully(inputDescBytes);
+    outputDescriptor =
+        DagTypeConverters.convertOutputDescriptorFromDAGPlan(
+            TezEntityDescriptorProto.parseFrom(inputDescBytes));
+  }
+
+  public String toString() {
+    return "{ destinationVertexName=" + destinationVertexName
+        + ", physicalEdgeCount" + physicalEdgeCount
+        + ", outputClassName=" + outputDescriptor.getClassName()
+        + " }";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java
new file mode 100644
index 0000000..6527777
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TaskSpec.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskSpec implements Writable {
+
+  private TezTaskAttemptID taskAttemptId;
+  private String vertexName;
+  private String user;
+  private ProcessorDescriptor processorDescriptor;
+  private List<InputSpec> inputSpecList;
+  private List<OutputSpec> outputSpecList;
+
+  public TaskSpec() {
+  }
+
+  // TODO NEWTEZ Remove user
+  public TaskSpec(TezTaskAttemptID taskAttemptID, String user,
+      String vertexName, ProcessorDescriptor processorDescriptor,
+      List<InputSpec> inputSpecList, List<OutputSpec> outputSpecList) {
+    this.taskAttemptId = taskAttemptID;
+    this.vertexName = vertexName;
+    this.user = user;
+    this.processorDescriptor = processorDescriptor;
+    this.inputSpecList = inputSpecList;
+    this.outputSpecList = outputSpecList;
+  }
+
+  public String getVertexName() {
+    return vertexName;
+  }
+
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskAttemptId;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public ProcessorDescriptor getProcessorDescriptor() {
+    return processorDescriptor;
+  }
+
+  public List<InputSpec> getInputs() {
+    return inputSpecList;
+  }
+
+  public List<OutputSpec> getOutputs() {
+    return outputSpecList;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskAttemptId.write(out);
+    out.writeUTF(vertexName);
+    byte[] procDesc =
+        DagTypeConverters.convertToDAGPlan(processorDescriptor).toByteArray();
+    out.writeInt(procDesc.length);
+    out.write(procDesc);
+    out.writeInt(inputSpecList.size());
+    for (InputSpec inputSpec : inputSpecList) {
+      inputSpec.write(out);
+    }
+    out.writeInt(outputSpecList.size());
+    for (OutputSpec outputSpec : outputSpecList) {
+      outputSpec.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskAttemptId = new TezTaskAttemptID();
+    taskAttemptId.readFields(in);
+    vertexName = in.readUTF();
+    int procDescLength = in.readInt();
+    // TODO at least 3 buffer copies here. Need to convert this to full PB
+    // TEZ-305
+    byte[] procDescBytes = new byte[procDescLength];
+    in.readFully(procDescBytes);
+    processorDescriptor =
+        DagTypeConverters.convertProcessorDescriptorFromDAGPlan(
+            TezEntityDescriptorProto.parseFrom(procDescBytes));
+    int numInputSpecs = in.readInt();
+    inputSpecList = new ArrayList<InputSpec>(numInputSpecs);
+    for (int i = 0; i < numInputSpecs; i++) {
+      InputSpec inputSpec = new InputSpec();
+      inputSpec.readFields(in);
+      inputSpecList.add(inputSpec);
+    }
+    int numOutputSpecs = in.readInt();
+    outputSpecList = new ArrayList<OutputSpec>(numOutputSpecs);
+    for (int i = 0; i < numOutputSpecs; i++) {
+      OutputSpec outputSpec = new OutputSpec();
+      outputSpec.readFields(in);
+      outputSpecList.add(outputSpec);
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    sb.append("TaskAttemptID:" + taskAttemptId);
+    sb.append("processorName=" + processorDescriptor.getClassName()
+        + ", inputSpecListSize=" + inputSpecList.size()
+        + ", outputSpecListSize=" + outputSpecList.size());
+    sb.append(", inputSpecList=[");
+    for (InputSpec i : inputSpecList) {
+      sb.append("{" + i.toString() + "}, ");
+    }
+    sb.append("], outputSpecList=[");
+    for (OutputSpec i : outputSpecList) {
+      sb.append("{" + i.toString() + "}, ");
+    }
+    sb.append("]");
+    return sb.toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java
new file mode 100644
index 0000000..9d0228d
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezEvent.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.engine.api.events.TaskAttemptFailedEvent;
+import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.engine.api.events.EventProtos.DataMovementEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputFailedEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputInformationEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputReadErrorEventProto;
+import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
+import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputInformationEvent;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+
+import com.google.protobuf.ByteString;
+
+public class TezEvent implements Writable {
+
+  private EventType eventType;
+
+  private Event event;
+
+  private EventMetaData sourceInfo;
+
+  private EventMetaData destinationInfo;
+
+  public TezEvent() {
+  }
+
+  public TezEvent(Event event, EventMetaData sourceInfo) {
+    this.event = event;
+    this.setSourceInfo(sourceInfo);
+    if (event instanceof DataMovementEvent) {
+      eventType = EventType.DATA_MOVEMENT_EVENT;
+    } else if (event instanceof InputReadErrorEvent) {
+      eventType = EventType.INPUT_READ_ERROR_EVENT;
+    } else if (event instanceof TaskAttemptFailedEvent) {
+      eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
+    } else if (event instanceof TaskAttemptCompletedEvent) {
+      eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
+    } else if (event instanceof InputInformationEvent) {
+      eventType = EventType.INTPUT_INFORMATION_EVENT;
+    } else if (event instanceof InputFailedEvent) {
+      eventType = EventType.INPUT_FAILED_EVENT;
+    } else if (event instanceof TaskStatusUpdateEvent) {
+      eventType = EventType.TASK_STATUS_UPDATE_EVENT;
+    } else {
+      throw new TezUncheckedException("Unknown event, event="
+          + event.getClass().getName());
+    }
+  }
+
+  public Event getEvent() {
+    return event;
+  }
+
+  public EventMetaData getSourceInfo() {
+    return sourceInfo;
+  }
+
+  public void setSourceInfo(EventMetaData sourceInfo) {
+    this.sourceInfo = sourceInfo;
+  }
+
+  public EventMetaData getDestinationInfo() {
+    return destinationInfo;
+  }
+
+  public void setDestinationInfo(EventMetaData destinationInfo) {
+    this.destinationInfo = destinationInfo;
+  }
+
+  public EventType getEventType() {
+    return eventType;
+  }
+
+  private void serializeEvent(DataOutput out) throws IOException {
+    if (event == null) {
+      out.writeBoolean(false);
+      return;
+    }
+    out.writeBoolean(true);
+    out.writeInt(eventType.ordinal());
+    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+      // TODO NEWTEZ convert to PB
+      TaskStatusUpdateEvent sEvt = (TaskStatusUpdateEvent) event;
+      sEvt.write(out);
+    } else {
+      byte[] eventBytes = null;
+      switch (eventType) {
+      case DATA_MOVEMENT_EVENT:
+        DataMovementEvent dmEvt = (DataMovementEvent) event;
+        eventBytes = DataMovementEventProto.newBuilder()
+          .setSourceIndex(dmEvt.getSourceIndex())
+          .setTargetIndex(dmEvt.getTargetIndex())
+          .setUserPayload(ByteString.copyFrom(dmEvt.getUserPayload()))
+          .build().toByteArray();
+        break;
+      case INPUT_READ_ERROR_EVENT:
+        InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
+        eventBytes = InputReadErrorEventProto.newBuilder()
+            .setIndex(ideEvt.getIndex())
+            .setDiagnostics(ideEvt.getDiagnostics())
+            .build().toByteArray();
+        break;
+      case TASK_ATTEMPT_FAILED_EVENT:
+        TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
+        eventBytes = TaskAttemptFailedEventProto.newBuilder()
+            .setDiagnostics(tfEvt.getDiagnostics())
+            .build().toByteArray();
+        break;
+      case TASK_ATTEMPT_COMPLETED_EVENT:
+        eventBytes = TaskAttemptCompletedEventProto.newBuilder()
+            .build().toByteArray();
+        break;
+      case INPUT_FAILED_EVENT:
+        InputFailedEvent ifEvt = (InputFailedEvent) event;
+        eventBytes = InputFailedEventProto.newBuilder()
+            .setSourceIndex(ifEvt.getSourceIndex())
+            .setTargetIndex(ifEvt.getTargetIndex())
+            .setVersion(ifEvt.getVersion()).build().toByteArray();
+      case INTPUT_INFORMATION_EVENT:
+        InputInformationEvent iEvt = (InputInformationEvent) event;
+        eventBytes = InputInformationEventProto.newBuilder()
+            .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
+            .build().toByteArray();
+      default:
+        throw new TezUncheckedException("Unknown TezEvent"
+           + ", type=" + eventType);
+      }
+      out.writeInt(eventBytes.length);
+      out.write(eventBytes);
+    }
+  }
+
+  private void deserializeEvent(DataInput in) throws IOException {
+    if (!in.readBoolean()) {
+      event = null;
+      return;
+    }
+    eventType = EventType.values()[in.readInt()];
+    if (eventType.equals(EventType.TASK_STATUS_UPDATE_EVENT)) {
+      // TODO NEWTEZ convert to PB
+      event = new TaskStatusUpdateEvent();
+      ((TaskStatusUpdateEvent)event).readFields(in);
+    } else {
+      int eventBytesLen = in.readInt();
+      byte[] eventBytes = new byte[eventBytesLen];
+      in.readFully(eventBytes);
+      switch (eventType) {
+      case DATA_MOVEMENT_EVENT:
+        DataMovementEventProto dmProto =
+            DataMovementEventProto.parseFrom(eventBytes);
+        event = new DataMovementEvent(dmProto.getSourceIndex(),
+            dmProto.getTargetIndex(),
+            dmProto.getUserPayload().toByteArray());
+        break;
+      case INPUT_READ_ERROR_EVENT:
+        InputReadErrorEventProto ideProto =
+            InputReadErrorEventProto.parseFrom(eventBytes);
+        event = new InputReadErrorEvent(ideProto.getDiagnostics(),
+            ideProto.getIndex(), ideProto.getVersion());
+        break;
+      case TASK_ATTEMPT_FAILED_EVENT:
+        TaskAttemptFailedEventProto tfProto =
+            TaskAttemptFailedEventProto.parseFrom(eventBytes);
+        event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
+        break;
+      case TASK_ATTEMPT_COMPLETED_EVENT:
+        event = new TaskAttemptCompletedEvent();
+        break;
+      case INPUT_FAILED_EVENT:
+        InputFailedEventProto ifProto =
+            InputFailedEventProto.parseFrom(eventBytes);
+        event = new InputFailedEvent(ifProto.getSourceIndex(),
+            ifProto.getTargetIndex(), ifProto.getVersion());
+        break;
+      case INTPUT_INFORMATION_EVENT:
+        InputInformationEventProto infoProto =
+            InputInformationEventProto.parseFrom(eventBytes);
+        event = new InputInformationEvent(
+            infoProto.getUserPayload().toByteArray());
+        break;
+      default:
+        throw new TezUncheckedException("Unknown TezEvent"
+           + ", type=" + eventType);
+      }
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    serializeEvent(out);
+    if (sourceInfo != null) {
+      out.writeBoolean(true);
+      sourceInfo.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+    if (destinationInfo != null) {
+      out.writeBoolean(true);
+      destinationInfo.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    deserializeEvent(in);
+    if (in.readBoolean()) {
+      sourceInfo = new EventMetaData();
+      sourceInfo.readFields(in);
+    }
+    if (in.readBoolean()) {
+      destinationInfo = new EventMetaData();
+      destinationInfo.readFields(in);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
new file mode 100644
index 0000000..dc1a447
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatRequest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+
+public class TezHeartbeatRequest implements Writable {
+
+  private String containerIdentifier;
+  private List<TezEvent> events;
+  private TezTaskAttemptID currentTaskAttemptID;
+  private int startIndex;
+  private int maxEvents;
+  private long requestId;
+
+  public TezHeartbeatRequest() {
+  }
+
+  public TezHeartbeatRequest(long requestId, List<TezEvent> events,
+      String containerIdentifier, TezTaskAttemptID taskAttemptID,
+      int startIndex, int maxEvents) {
+    this.containerIdentifier = containerIdentifier;
+    this.requestId = requestId;
+    this.events = Collections.unmodifiableList(events);
+    this.startIndex = startIndex;
+    this.maxEvents = maxEvents;
+    this.currentTaskAttemptID = taskAttemptID;
+  }
+
+  public String getContainerIdentifier() {
+    return containerIdentifier;
+  }
+
+  public List<TezEvent> getEvents() {
+    return events;
+  }
+
+  public int getStartIndex() {
+    return startIndex;
+  }
+
+  public int getMaxEvents() {
+    return maxEvents;
+  }
+
+  public long getRequestId() {
+    return requestId;
+  }
+
+  public TezTaskAttemptID getCurrentTaskAttemptID() {
+    return currentTaskAttemptID;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (events != null) {
+      out.writeBoolean(true);
+      out.writeInt(events.size());
+      for (TezEvent e : events) {
+        e.write(out);
+      }
+    } else {
+      out.writeBoolean(false);
+    }
+    if (currentTaskAttemptID != null) {
+      out.writeBoolean(true);
+      currentTaskAttemptID.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+    out.writeInt(startIndex);
+    out.writeInt(maxEvents);
+    out.writeLong(requestId);
+    Text.writeString(out, containerIdentifier);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    if (in.readBoolean()) {
+      int eventsCount = in.readInt();
+      events = new ArrayList<TezEvent>(eventsCount);
+      for (int i = 0; i < eventsCount; ++i) {
+        TezEvent e = new TezEvent();
+        e.readFields(in);
+        events.add(e);
+      }
+    }
+    if (in.readBoolean()) {
+      currentTaskAttemptID = new TezTaskAttemptID();
+      currentTaskAttemptID.readFields(in);
+    } else {
+      currentTaskAttemptID = null;
+    }
+    startIndex = in.readInt();
+    maxEvents = in.readInt();
+    requestId = in.readLong();
+    containerIdentifier = Text.readString(in);
+  }
+
+  @Override
+  public String toString() {
+    return "{ "
+        + " containerId=" + containerIdentifier
+        + ", requestId=" + requestId
+        + ", startIndex=" + startIndex
+        + ", maxEventsToGet=" + maxEvents
+        + ", taskAttemptId" + currentTaskAttemptID
+        + ", eventCount=" + (events != null ? events.size() : 0)
+        + " }";
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
new file mode 100644
index 0000000..22ae7eb
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezHeartbeatResponse.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+
+public class TezHeartbeatResponse implements Writable {
+
+  private long lastRequestId;
+  private boolean shouldDie = false;
+  private List<TezEvent> events;
+
+  public TezHeartbeatResponse() {
+  }
+
+  public TezHeartbeatResponse(List<TezEvent> events) {
+    this.events = Collections.unmodifiableList(events);
+  }
+
+  public List<TezEvent> getEvents() {
+    return events;
+  }
+
+  public boolean shouldDie() {
+    return shouldDie;
+  }
+
+  public long getLastRequestId() {
+    return lastRequestId;
+  }
+
+  public void setEvents(List<TezEvent> events) {
+    this.events = Collections.unmodifiableList(events);
+  }
+
+  public void setLastRequestId(long lastRequestId ) {
+    this.lastRequestId = lastRequestId;
+  }
+
+  public void setShouldDie() {
+    this.shouldDie = true;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(lastRequestId);
+    out.writeBoolean(shouldDie);
+    if(events != null) {
+      out.writeBoolean(true);
+      out.writeInt(events.size());
+      for (TezEvent e : events) {
+        e.write(out);
+      }
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    lastRequestId = in.readLong();
+    shouldDie = in.readBoolean();
+    if(in.readBoolean()) {
+      int eventCount = in.readInt();
+      events = new ArrayList<TezEvent>(eventCount);
+      for (int i = 0; i < eventCount; ++i) {
+        TezEvent e = new TezEvent();
+        e.readFields(in);
+        events.add(e);
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "{ "
+        + " lastRequestId=" + lastRequestId
+        + ", shouldDie=" + shouldDie
+        + ", eventCount=" + (events != null ? events.size() : 0)
+        + " }";
+  }
+}