You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/11 00:28:36 UTC

git commit: TEZ-426. Changes in YarnTezDagChild for logical task using TaskSpec. (hitesh)

Updated Branches:
  refs/heads/TEZ-398 da0cca381 -> ee47464dd


TEZ-426. Changes in YarnTezDagChild for logical task using TaskSpec. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/ee47464d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/ee47464d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/ee47464d

Branch: refs/heads/TEZ-398
Commit: ee47464ddb9a2bce75fdc8805a8786416d0c6078
Parents: da0cca3
Author: Hitesh Shah <hi...@apache.org>
Authored: Tue Sep 10 15:28:15 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Tue Sep 10 15:28:15 2013 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/YarnTezDagChild.java   | 274 +++++++++++++++----
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  25 +-
 .../engine/newapi/events/InputFailedEvent.java  |   9 +
 tez-engine-api/src/main/proto/Events.proto      |  12 +-
 .../tez/common/TezTaskUmbilicalProtocol.java    |   4 +-
 .../events/TaskAttemptCompletedEvent.java       |  28 ++
 .../newapi/events/TaskAttemptFailedEvent.java   |  35 +++
 .../engine/newapi/events/TaskFailedEvent.java   |  35 ---
 .../tez/engine/newapi/impl/EventType.java       |   3 +-
 .../apache/tez/engine/newapi/impl/TezEvent.java |  78 +++++-
 .../engine/newapi/impl/TezHeartbeatRequest.java |   9 +-
 .../newapi/impl/TezHeartbeatResponse.java       |  38 ++-
 .../engine/newapi/impl/TezInputContextImpl.java |  19 +-
 .../newapi/impl/TezOutputContextImpl.java       |  18 +-
 .../newapi/impl/TezProcessorContextImpl.java    |  18 +-
 .../engine/newapi/impl/TezTaskContextImpl.java  |   8 +-
 .../tez/engine/newapi/impl/TezUmbilical.java    | 197 +------------
 .../LogicalIOProcessorRuntimeTask.java          |  38 ++-
 tez-engine/src/main/proto/Events.proto          |   5 +-
 .../apache/hadoop/mapred/LocalJobRunnerTez.java |  12 +-
 .../tez/mapreduce/TestUmbilicalProtocol.java    |  11 +-
 21 files changed, 531 insertions(+), 345 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 2bac9b6..3141910 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -26,6 +26,14 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSError;
@@ -39,6 +47,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
@@ -56,9 +65,9 @@ import org.apache.tez.common.counters.Limits;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
@@ -67,7 +76,11 @@ import org.apache.tez.engine.common.security.TokenCache;
 import org.apache.tez.engine.newapi.impl.InputSpec;
 import org.apache.tez.engine.newapi.impl.OutputSpec;
 import org.apache.tez.engine.newapi.impl.TaskSpec;
-import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
+import org.apache.tez.engine.newapi.impl.TezUmbilical;
+import org.apache.tez.engine.newruntime.LogicalIOProcessorRuntimeTask;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.output.SimpleOutput;
 
@@ -82,6 +95,140 @@ public class YarnTezDagChild {
 
   private static final Logger LOG = Logger.getLogger(YarnTezDagChild.class);
 
+  private static AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private static String containerIdStr;
+  private static int eventCounter = 0;
+  private static int maxEventsToGet = 0;
+  private static LinkedList<TezEvent> eventsToSend =
+      new LinkedList<TezEvent>();
+  private static ConcurrentLinkedQueue<TezEvent> eventsToBeProcessed =
+      new ConcurrentLinkedQueue<TezEvent>();
+  private static AtomicLong requestCounter = new AtomicLong(0);
+  private static TezTaskAttemptID currentTaskAttemptID;
+  private static long amPollInterval;
+  private static TezTaskUmbilicalProtocol umbilical;
+  private static Object eventLock = new Object();
+  private static ReentrantReadWriteLock taskLock = new ReentrantReadWriteLock();
+  private static LogicalIOProcessorRuntimeTask currentTask = null;
+
+  private static Thread startHeartbeatThread() {
+    Thread heartbeatThread = new Thread(new Runnable() {
+      public void run() {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(amPollInterval);
+            try {
+              heartbeat();
+            } catch (TezException e) {
+              LOG.error("Error communicating with AM: " + e.getMessage() , e);
+              // TODO TODONEWTEZ
+            } catch (InvalidToken e) {
+              LOG.error("Error in authenticating with AM: ", e);
+              // TODO TODONEWTEZ
+            } catch (Exception e) {
+              LOG.error("Error in heartbeating with AM. ", e);
+              // TODO TODONEWTEZ
+            }
+          } catch (InterruptedException e) {
+            if (!stopped.get()) {
+              LOG.warn("Heartbeat thread interrupted. Returning.");
+            }
+            return;
+          }
+        }
+      }
+    });
+    heartbeatThread.setName("Tez Container Heartbeat Thread ["
+        + containerIdStr + "]");
+    heartbeatThread.start();
+    return heartbeatThread;
+  }
+
+  private static Thread startRouterThread() {
+    Thread eventRouterThread = new Thread(new Runnable() {
+      public void run() {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          while (true) {
+            try {
+              taskLock.readLock().lock();
+              if (currentTask != null) {
+                break;
+              }
+            } finally {
+              taskLock.readLock().unlock();
+            }
+            try {
+              Thread.sleep(10);
+            } catch (InterruptedException e) {
+              if (!stopped.get()) {
+                LOG.warn("Event Router thread interrupted. Returning.");
+              }
+              return;
+            }
+          }
+          try {
+            TezEvent e = eventsToBeProcessed.poll();
+            if (e == null) {
+              eventsToBeProcessed.wait();
+              continue;
+            }
+            // TODO TODONEWTEZ
+            if (!e.getDestinationInfo().getTaskAttemptID().equals(
+                currentTaskAttemptID)) {
+              // error? or block?
+              continue;
+            }
+            try {
+              taskLock.readLock().lock();
+              if (currentTask != null) {
+                currentTask.handleEvent(e);
+              }
+            } finally {
+              taskLock.readLock().unlock();
+            }
+          } catch (InterruptedException e) {
+            if (!stopped.get()) {
+              LOG.warn("Event Router thread interrupted. Returning.");
+            }
+            return;
+          }
+        }
+      }
+    });
+    eventRouterThread.setName("Tez Container Event Router Thread ["
+        + containerIdStr + "]");
+    eventRouterThread.start();
+    return eventRouterThread;
+  }
+
+  private static void heartbeat() throws TezException, IOException {
+    try {
+      taskLock.readLock().lock();
+      if (currentTask == null) {
+        return;
+      }
+    } finally {
+      taskLock.readLock().unlock();
+    }
+    synchronized (eventLock) {
+      List<TezEvent> events = new ArrayList<TezEvent>();
+      events.addAll(eventsToSend);
+      long reqId = requestCounter.incrementAndGet();
+      TezHeartbeatRequest request = new TezHeartbeatRequest(reqId, events,
+          currentTaskAttemptID, eventCounter, maxEventsToGet);
+      TezHeartbeatResponse response = umbilical.heartbeat(request);
+      if (response.getLastRequestId() != reqId) {
+        // TODO TODONEWTEZ
+        throw new TezException("AM and Task out of sync");
+      }
+      eventsToSend.clear();
+      eventCounter += response.getEvents().size();
+      eventsToBeProcessed.addAll(response.getEvents());
+      eventsToBeProcessed.notifyAll();
+    }
+  }
+
   public static void main(String[] args) throws Throwable {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     if (LOG.isDebugEnabled()) {
@@ -90,8 +237,8 @@ public class YarnTezDagChild {
 
     final Configuration defaultConf = new Configuration();
     TezUtils.addUserSpecifiedTezConfiguration(defaultConf);
-    // Security settings will be loaded based on core-site and core-default. Don't
-    // depend on the jobConf for this.
+    // Security settings will be loaded based on core-site and core-default.
+    // Don't depend on the jobConf for this.
     UserGroupInformation.setConfiguration(defaultConf);
     Limits.setConfiguration(defaultConf);
 
@@ -127,6 +274,13 @@ public class YarnTezDagChild {
       }
     }
 
+    amPollInterval = defaultConf.getLong(
+        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
+    maxEventsToGet = defaultConf.getInt(
+        TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
+        TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
+
     // Create TaskUmbilicalProtocol as actual task owner.
     UserGroupInformation taskOwner =
       UserGroupInformation.createRemoteUser(tokenIdentifier);
@@ -134,7 +288,7 @@ public class YarnTezDagChild {
     Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
     SecurityUtil.setTokenService(jobToken, address);
     taskOwner.addToken(jobToken);
-    final TezTaskUmbilicalProtocol umbilical =
+    umbilical =
       taskOwner.doAs(new PrivilegedExceptionAction<TezTaskUmbilicalProtocol>() {
       @Override
       public TezTaskUmbilicalProtocol run() throws Exception {
@@ -143,6 +297,18 @@ public class YarnTezDagChild {
       }
     });
 
+    Thread heartbeatThread = startHeartbeatThread();
+    Thread eventRouterThread = startRouterThread();
+
+    TezUmbilical tezUmbilical = new TezUmbilical() {
+      @Override
+      public void addEvents(Collection<TezEvent> events) {
+        synchronized (eventLock) {
+          eventsToSend.addAll(events);
+        }
+      }
+    };
+
     // report non-pid to application master
     String pid = System.getenv().get("JVM_PID");
     if (LOG.isDebugEnabled()) {
@@ -151,7 +317,6 @@ public class YarnTezDagChild {
     TaskSpec taskSpec = null;
     ContainerTask containerTask = null;
     UserGroupInformation childUGI = null;
-    TezTaskAttemptID taskAttemptId = null;
     ContainerContext containerContext = new ContainerContext(
         containerIdentifier, pid);
     int getTaskMaxSleepTime = defaultConf.getInt(
@@ -174,8 +339,9 @@ public class YarnTezDagChild {
         }
         LOG.info("TaskInfo: shouldDie: "
             + containerTask.shouldDie()
-            + (containerTask.shouldDie() == true ? "" : ", taskAttemptId: "
-                + containerTask.getTaskSpec().getTaskAttemptID()));
+            + (containerTask.shouldDie() == true ?
+                "" : ", currentTaskAttemptId: "
+                  + containerTask.getTaskSpec().getTaskAttemptID()));
 
         if (containerTask.shouldDie()) {
           return;
@@ -186,25 +352,29 @@ public class YarnTezDagChild {
           LOG.debug("New container task context:"
               + taskSpec.toString());
         }
-        taskAttemptId = taskSpec.getTaskAttemptID();
-        TezVertexID newVertexId = taskAttemptId.getTaskID().getVertexID();
 
-        if (currentVertexId != null) {
-          if (!currentVertexId.equals(newVertexId)) {
-            objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
-          }
-          if (!currentVertexId.getDAGId().equals(newVertexId.getDAGId())) {
-            objectRegistry.clearCache(ObjectLifeCycle.DAG);
+        try {
+          taskLock.writeLock().lock();
+          currentTaskAttemptID = taskSpec.getTaskAttemptID();
+          TezVertexID newVertexId =
+              currentTaskAttemptID.getTaskID().getVertexID();
+
+          if (currentVertexId != null) {
+            if (!currentVertexId.equals(newVertexId)) {
+              objectRegistry.clearCache(ObjectLifeCycle.VERTEX);
+            }
+            if (!currentVertexId.getDAGId().equals(newVertexId.getDAGId())) {
+              objectRegistry.clearCache(ObjectLifeCycle.DAG);
+            }
           }
+          currentVertexId = newVertexId;
+          updateLoggers(currentTaskAttemptID);
+          currentTask = createLogicalTask(
+              taskSpec, defaultConf, tezUmbilical);
+        } finally {
+          taskLock.writeLock().unlock();
         }
-        currentVertexId = newVertexId;
-
-        updateLoggers(taskAttemptId);
-
-        final Task t = createAndConfigureTezTask(taskSpec, umbilical,
-            credentials, jobToken, attemptNumber);
 
-        final Configuration conf = ((RuntimeTask)t).getConfiguration();
 
         // TODO Initiate Java VM metrics
         // JvmMetrics.initSingleton(containerId.toString(), job.getSessionId());
@@ -216,7 +386,15 @@ public class YarnTezDagChild {
         childUGI.doAs(new PrivilegedExceptionAction<Object>() {
           @Override
           public Object run() throws Exception {
-            runTezTask(t, umbilical, conf); // run the task
+            currentTask.initialize();
+            currentTask.run();
+            currentTask.close();
+            try {
+              taskLock.writeLock().lock();
+              currentTask = null;
+            } finally {
+              taskLock.writeLock().unlock();
+            }
             return null;
           }
         });
@@ -225,15 +403,18 @@ public class YarnTezDagChild {
       }
     } catch (FSError e) {
       LOG.fatal("FSError from child", e);
-      umbilical.fsError(taskAttemptId, e.getMessage());
+      umbilical.fsError(currentTaskAttemptID, e.getMessage());
     } catch (Throwable throwable) {
       LOG.fatal("Error running child : "
     	        + StringUtils.stringifyException(throwable));
-      if (taskAttemptId != null) {
+      if (currentTaskAttemptID != null) {
         String cause = StringUtils.stringifyException(throwable);
-        umbilical.fatalError(taskAttemptId, cause);
+        umbilical.fatalError(currentTaskAttemptID, cause);
       }
     } finally {
+      stopped.set(true);
+      eventRouterThread.join();
+      heartbeatThread.join();
       RPC.stopProxy(umbilical);
       DefaultMetricsSystem.shutdown();
       // Shutting down log4j of the child-vm...
@@ -297,17 +478,15 @@ public class YarnTezDagChild {
     conf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
   }
 
-  private static Task createAndConfigureTezTask(
-      TaskSpec taskSpec, TezTaskUmbilicalProtocol master,
-      Credentials cxredentials, Token<JobTokenIdentifier> jobToken,
-      int appAttemptId) throws IOException, InterruptedException {
-
-    Configuration conf = new Configuration(false);
-    // set tcp nodelay
-    conf.setBoolean("ipc.client.tcpnodelay", true);
-    conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
+  private static LogicalIOProcessorRuntimeTask createLogicalTask(
+      TaskSpec taskSpec, Configuration conf,
+      TezUmbilical tezUmbilical) throws IOException {
 
+    // FIXME TODONEWTEZ
+    // conf.setBoolean("ipc.client.tcpnodelay", true);
+    // conf.setInt(TezJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
     configureLocalDirs(conf);
+    FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
 
     // FIXME need Input/Output vertices else we have this hack
     if (taskSpec.getInputs().isEmpty()) {
@@ -326,29 +505,10 @@ public class YarnTezDagChild {
       taskSpec.getOutputs().add(
           new OutputSpec("null", simpleOutputDesc, 0));
     }
-    Task t = null;
-
-    // FIXME TODONEWTEZ
-
-    // RuntimeUtils.createRuntimeTask(taskSpec);
-    // t.initialize(conf, taskSpec.getProcessorUserPayload(), master);
-
-    // FIXME wrapper should initialize all of processor, inputs and outputs
-    // Currently, processor is inited via task init
-    // and processor then inits inputs and outputs
-    return t;
+    return new LogicalIOProcessorRuntimeTask(taskSpec, conf,
+        tezUmbilical);
   }
 
-  private static void runTezTask(
-      Task t, TezTaskUmbilicalProtocol master, Configuration conf)
-  throws IOException, InterruptedException {
-    // use job-specified working directory
-    FileSystem.get(conf).setWorkingDirectory(getWorkingDirectory(conf));
-
-    // Run!
-    t.run();
-    t.close();
-  }
 
   private static Path getWorkingDirectory(Configuration conf) {
     String name = conf.get(JobContext.WORKING_DIR);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 6d64a58..2c242de 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -59,7 +59,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
-import org.apache.tez.engine.newapi.events.TaskFailedEvent;
+import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
 import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
@@ -577,10 +577,11 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void taskFailed(TezTaskAttemptID taskAttemptId,
-      TezEvent tezEvent) throws IOException {
-    TaskFailedEvent taskFailedEvent = (TaskFailedEvent) tezEvent.getEvent();
-    LOG.fatal("Task: " + taskAttemptId + " - failed : "
+  public void taskAttemptFailed(TezTaskAttemptID taskAttemptId,
+      TezEvent tezAttemptFailedEvent) throws IOException {
+    TaskAttemptFailedEvent taskFailedEvent =
+        (TaskAttemptFailedEvent) tezAttemptFailedEvent.getEvent();
+    LOG.fatal("Task Attempt: " + taskAttemptId + " - failed : "
         + taskFailedEvent.getDiagnostics());
     reportDiagnosticInfo(taskAttemptId, "Error: "
         + taskFailedEvent.getDiagnostics());
@@ -590,4 +591,18 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
 
   }
 
+  @Override
+  public void taskAttemptCompleted(TezTaskAttemptID taskAttemptId,
+      TezEvent taskAttemptCompletedEvent) throws IOException {
+    LOG.info("Task attempt completed acknowledgement from "
+      + taskAttemptId.toString());
+
+    taskHeartbeatHandler.progressing(taskAttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_DONE));
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
index ac49250..042590e 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputFailedEvent.java
@@ -56,6 +56,15 @@ public class InputFailedEvent extends Event{
     this.sourceIndex = sourceIndex;
   }
 
+  @Private
+  public InputFailedEvent(int sourceIndex,
+      int targetIndex,
+      int version) {
+    this.sourceIndex = sourceIndex;
+    this.targetIndex = targetIndex;
+    this.version = version;
+  }
+
   public int getSourceIndex() {
     return sourceIndex;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/proto/Events.proto b/tez-engine-api/src/main/proto/Events.proto
index 3651c35..21cacf6 100644
--- a/tez-engine-api/src/main/proto/Events.proto
+++ b/tez-engine-api/src/main/proto/Events.proto
@@ -27,8 +27,18 @@ message DataMovementEventProto {
   optional int32 version = 4;
 }
 
-message InputDataErrorEventProto {
+message InputReadErrorEventProto {
   optional int32 index = 1;
   optional string diagnostics = 2;
   optional int32 version = 3;
 }
+
+message InputFailedEventProto {
+  optional int32 source_index = 1;
+  optional int32 target_index = 2;
+  optional int32 version = 4;
+}
+
+message InputInformationEventProto {
+  optional bytes user_payload = 1;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
index af2193c..c1289e6 100644
--- a/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
+++ b/tez-engine/src/main/java/org/apache/tez/common/TezTaskUmbilicalProtocol.java
@@ -87,7 +87,9 @@ public interface TezTaskUmbilicalProtocol extends Master {
   public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
       throws IOException, TezException;
 
-  public void taskFailed(TezTaskAttemptID attemptID,
+  public void taskAttemptFailed(TezTaskAttemptID attemptID,
       TezEvent taskFailedEvent) throws IOException;
 
+  public void taskAttemptCompleted(TezTaskAttemptID attemptID,
+      TezEvent taskAttemptCompletedEvent) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
new file mode 100644
index 0000000..d3a582d
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptCompletedEvent.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskAttemptCompletedEvent extends Event {
+
+  public TaskAttemptCompletedEvent() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
new file mode 100644
index 0000000..772d7fe
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskAttemptFailedEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.tez.engine.newapi.Event;
+
+public class TaskAttemptFailedEvent extends Event {
+
+  private final String diagnostics;
+
+  public TaskAttemptFailedEvent(String diagnostics) {
+    this.diagnostics = diagnostics;
+  }
+
+  public String getDiagnostics() {
+    return diagnostics;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java
deleted file mode 100644
index ddd346f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/events/TaskFailedEvent.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import org.apache.tez.engine.newapi.Event;
-
-public class TaskFailedEvent extends Event {
-
-  private final String diagnostics;
-
-  public TaskFailedEvent(String diagnostics) {
-    this.diagnostics = diagnostics;
-  }
-
-  public String getDiagnostics() {
-    return diagnostics;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
index 5b71d70..51a1b24 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/EventType.java
@@ -19,7 +19,8 @@
 package org.apache.tez.engine.newapi.impl;
 
 public enum EventType {
-  TASK_FAILED_EVENT,
+  TASK_ATTEMPT_COMPLETED_EVENT,
+  TASK_ATTEMPT_FAILED_EVENT,
   DATA_MOVEMENT_EVENT,
   INPUT_READ_ERROR_EVENT,
   INPUT_FAILED_EVENT,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
index da3e551..f8cc3ed 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezEvent.java
@@ -25,12 +25,18 @@ import java.io.IOException;
 import org.apache.hadoop.io.Writable;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.engine.api.events.EventProtos.DataMovementEventProto;
-import org.apache.tez.engine.api.events.EventProtos.InputDataErrorEventProto;
-import org.apache.tez.engine.api.events.SystemEventProtos.TaskFailedEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputFailedEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputInformationEventProto;
+import org.apache.tez.engine.api.events.EventProtos.InputReadErrorEventProto;
+import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptCompletedEventProto;
+import org.apache.tez.engine.api.events.SystemEventProtos.TaskAttemptFailedEventProto;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputInformationEvent;
 import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.TaskFailedEvent;
+import org.apache.tez.engine.newapi.events.TaskAttemptCompletedEvent;
+import org.apache.tez.engine.newapi.events.TaskAttemptFailedEvent;
 
 import com.google.protobuf.ByteString;
 
@@ -54,8 +60,14 @@ public class TezEvent implements Writable {
       eventType = EventType.DATA_MOVEMENT_EVENT;
     } else if (event instanceof InputReadErrorEvent) {
       eventType = EventType.INPUT_READ_ERROR_EVENT;
-    } else if (event instanceof TaskFailedEvent) {
-      eventType = EventType.TASK_FAILED_EVENT;
+    } else if (event instanceof TaskAttemptFailedEvent) {
+      eventType = EventType.TASK_ATTEMPT_FAILED_EVENT;
+    } else if (event instanceof TaskAttemptCompletedEvent) {
+      eventType = EventType.TASK_ATTEMPT_COMPLETED_EVENT;
+    } else if (event instanceof InputInformationEvent) {
+      eventType = EventType.INTPUT_INFORMATION_EVENT;
+    } else if (event instanceof InputFailedEvent) {
+      eventType = EventType.INPUT_FAILED_EVENT;
     } else {
       throw new TezUncheckedException("Unknown event, event="
           + event.getClass().getName());
@@ -104,17 +116,35 @@ public class TezEvent implements Writable {
       break;
     case INPUT_READ_ERROR_EVENT:
       InputReadErrorEvent ideEvt = (InputReadErrorEvent) event;
-      eventBytes = InputDataErrorEventProto.newBuilder()
+      eventBytes = InputReadErrorEventProto.newBuilder()
           .setIndex(ideEvt.getIndex())
           .setDiagnostics(ideEvt.getDiagnostics())
           .build().toByteArray();
       break;
-    case TASK_FAILED_EVENT:
-      TaskFailedEvent tfEvt = (TaskFailedEvent) event;
-      eventBytes = TaskFailedEventProto.newBuilder()
+    case TASK_ATTEMPT_FAILED_EVENT:
+      TaskAttemptFailedEvent tfEvt = (TaskAttemptFailedEvent) event;
+      eventBytes = TaskAttemptFailedEventProto.newBuilder()
           .setDiagnostics(tfEvt.getDiagnostics())
           .build().toByteArray();
       break;
+    case TASK_ATTEMPT_COMPLETED_EVENT:
+      eventBytes = TaskAttemptCompletedEventProto.newBuilder()
+          .build().toByteArray();
+      break;
+    case INPUT_FAILED_EVENT:
+      InputFailedEvent ifEvt = (InputFailedEvent) event;
+      eventBytes = InputFailedEventProto.newBuilder()
+          .setSourceIndex(ifEvt.getSourceIndex())
+          .setTargetIndex(ifEvt.getTargetIndex())
+          .setVersion(ifEvt.getVersion()).build().toByteArray();
+    case INTPUT_INFORMATION_EVENT:
+      InputInformationEvent iEvt = (InputInformationEvent) event;
+      eventBytes = InputInformationEventProto.newBuilder()
+          .setUserPayload(ByteString.copyFrom(iEvt.getUserPayload()))
+          .build().toByteArray();
+    default:
+      throw new TezUncheckedException("Unknown TezEvent"
+         + ", type=" + eventType);
     }
     out.writeInt(eventType.ordinal());
     out.writeInt(eventBytes.length);
@@ -138,16 +168,34 @@ public class TezEvent implements Writable {
           dmProto.getUserPayload().toByteArray());
       break;
     case INPUT_READ_ERROR_EVENT:
-      InputDataErrorEventProto ideProto =
-          InputDataErrorEventProto.parseFrom(eventBytes);
+      InputReadErrorEventProto ideProto =
+          InputReadErrorEventProto.parseFrom(eventBytes);
       event = new InputReadErrorEvent(ideProto.getDiagnostics(),
           ideProto.getIndex(), ideProto.getVersion());
       break;
-    case TASK_FAILED_EVENT:
-      TaskFailedEventProto tfProto =
-          TaskFailedEventProto.parseFrom(eventBytes);
-      event = new TaskFailedEvent(tfProto.getDiagnostics());
+    case TASK_ATTEMPT_FAILED_EVENT:
+      TaskAttemptFailedEventProto tfProto =
+          TaskAttemptFailedEventProto.parseFrom(eventBytes);
+      event = new TaskAttemptFailedEvent(tfProto.getDiagnostics());
+      break;
+    case TASK_ATTEMPT_COMPLETED_EVENT:
+      event = new TaskAttemptCompletedEvent();
+      break;
+    case INPUT_FAILED_EVENT:
+      InputFailedEventProto ifProto =
+          InputFailedEventProto.parseFrom(eventBytes);
+      event = new InputFailedEvent(ifProto.getSourceIndex(),
+          ifProto.getTargetIndex(), ifProto.getVersion());
+      break;
+    case INTPUT_INFORMATION_EVENT:
+      InputInformationEventProto infoProto =
+          InputInformationEventProto.parseFrom(eventBytes);
+      event = new InputInformationEvent(
+          infoProto.getUserPayload().toByteArray());
       break;
+    default:
+      throw new TezUncheckedException("Unknown TezEvent"
+         + ", type=" + eventType);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
index cda456c..e47f14b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatRequest.java
@@ -35,11 +35,12 @@ public class TezHeartbeatRequest implements Writable {
   private TezTaskAttemptID currentTaskAttemptID;
   private int startIndex;
   private int maxEvents;
+  private long requestId;
 
   public TezHeartbeatRequest() {
   }
 
-  public TezHeartbeatRequest(List<TezEvent> events,
+  public TezHeartbeatRequest(long requestId, List<TezEvent> events,
       TezTaskAttemptID taskAttemptID,
       int startIndex, int maxEvents) {
     this.events = Collections.unmodifiableList(events);
@@ -60,6 +61,10 @@ public class TezHeartbeatRequest implements Writable {
     return maxEvents;
   }
 
+  public long getRequestId() {
+    return requestId;
+  }
+
   public TezTaskAttemptID getCurrentTaskAttemptID() {
     return currentTaskAttemptID;
   }
@@ -78,6 +83,7 @@ public class TezHeartbeatRequest implements Writable {
     }
     out.writeInt(startIndex);
     out.writeInt(maxEvents);
+    out.writeLong(requestId);
   }
 
   @Override
@@ -97,6 +103,7 @@ public class TezHeartbeatRequest implements Writable {
     }
     startIndex = in.readInt();
     maxEvents = in.readInt();
+    requestId = in.readLong();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
index 35c961b..572c7b6 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezHeartbeatResponse.java
@@ -18,13 +18,22 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.io.Writable;
 
-public class TezHeartbeatResponse {
+public class TezHeartbeatResponse implements Writable {
 
-  private final List<TezEvent> events;
+  private long lastRequestId;
+  private List<TezEvent> events;
+
+  public TezHeartbeatResponse() {
+  }
 
   public TezHeartbeatResponse(List<TezEvent> events) {
     this.events = Collections.unmodifiableList(events);
@@ -34,4 +43,29 @@ public class TezHeartbeatResponse {
     return events;
   }
 
+  public long getLastRequestId() {
+    return lastRequestId;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeLong(lastRequestId);
+    out.writeInt(events.size());
+    for (TezEvent e : events) {
+      e.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    lastRequestId = in.readLong();
+    int eventCount = in.readInt();
+    events = new ArrayList<TezEvent>(eventCount);
+    for (int i = 0; i < eventCount; ++i) {
+      TezEvent e = new TezEvent();
+      e.readFields(in);
+      events.add(e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index 567057a..b4558d0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -26,26 +27,38 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
 
 public class TezInputContextImpl extends TezTaskContextImpl
     implements TezInputContext {
 
   private final byte[] userPayload;
   private final String sourceVertexName;
+  private final TezUmbilical tezUmbilical;
+  private final EventMetaData sourceInfo;
 
   @Private
-  public TezInputContextImpl(Configuration conf, String taskVertexName,
+  public TezInputContextImpl(Configuration conf,
+      TezUmbilical tezUmbilical, String taskVertexName,
       String sourceVertexName, TezTaskAttemptID taskAttemptID,
       TezCounters counters, byte[] userPayload) {
     super(conf, taskVertexName, taskAttemptID, counters);
+    this.tezUmbilical = tezUmbilical;
     this.userPayload = userPayload;
     this.sourceVertexName = sourceVertexName;
+    this.sourceInfo = new EventMetaData(
+        EventGenerator.INPUT, taskVertexName, sourceVertexName,
+        taskAttemptID);
   }
 
   @Override
   public void sendEvents(List<Event> events) {
-    // TODO Auto-generated method stub
-
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent tEvt = new TezEvent(e, sourceInfo);
+      tezEvents.add(tEvt);
+    }
+    tezUmbilical.addEvents(tezEvents);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index a2ce60b..ba632db 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -26,27 +27,38 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
 
 public class TezOutputContextImpl extends TezTaskContextImpl
     implements TezOutputContext {
 
   private final byte[] userPayload;
   private final String destinationVertexName;
+  private final TezUmbilical tezUmbilical;
+  private final EventMetaData sourceInfo;
 
   @Private
-  public TezOutputContextImpl(Configuration conf, String taskVertexName,
+  public TezOutputContextImpl(Configuration conf,
+      TezUmbilical tezUmbilical, String taskVertexName,
       String destinationVertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
       byte[] userPayload) {
     super(conf, taskVertexName, taskAttemptID, counters);
     this.userPayload = userPayload;
     this.destinationVertexName = destinationVertexName;
+    this.tezUmbilical = tezUmbilical;
+    this.sourceInfo = new EventMetaData(EventGenerator.OUTPUT, taskVertexName,
+        destinationVertexName, taskAttemptID);
   }
 
   @Override
   public void sendEvents(List<Event> events) {
-    // TODO Auto-generated method stub
-
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent tEvt = new TezEvent(e, sourceInfo);
+      tezEvents.add(tEvt);
+    }
+    tezUmbilical.addEvents(tezEvents);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index b987bfe..4e0f061 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -25,23 +26,34 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.Event;
 import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newapi.impl.EventMetaData.EventGenerator;
 
 public class TezProcessorContextImpl extends TezTaskContextImpl
   implements TezProcessorContext {
 
   private final byte[] userPayload;
+  private final TezUmbilical tezUmbilical;
+  private final EventMetaData sourceInfo;
 
-  public TezProcessorContextImpl(Configuration tezConf, String vertexName,
+  public TezProcessorContextImpl(Configuration tezConf,
+      TezUmbilical tezUmbilical, String vertexName,
       TezTaskAttemptID taskAttemptID, TezCounters counters,
       byte[] userPayload) {
     super(tezConf, vertexName, taskAttemptID, counters);
     this.userPayload = userPayload;
+    this.tezUmbilical = tezUmbilical;
+    this.sourceInfo = new EventMetaData(EventGenerator.PROCESSOR,
+        taskVertexName, "", taskAttemptID);
   }
 
   @Override
   public void sendEvents(List<Event> events) {
-    // TODO Auto-generated method stub
-
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent tEvt = new TezEvent(e, sourceInfo);
+      tezEvents.add(tEvt);
+    }
+    tezUmbilical.addEvents(tezEvents);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index c89003e..712eec3 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -26,10 +26,10 @@ import org.apache.tez.engine.newapi.TezTaskContext;
 
 public abstract class TezTaskContextImpl implements TezTaskContext {
 
-  private final Configuration conf;
-  private final String taskVertexName;
-  private final TezTaskAttemptID taskAttemptID;
-  private final TezCounters counters;
+  protected final Configuration conf;
+  protected final String taskVertexName;
+  protected final TezTaskAttemptID taskAttemptID;
+  protected final TezCounters counters;
 
   @Private
   public TezTaskContextImpl(Configuration conf,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
index 51daf06..c3065fe 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
@@ -18,203 +18,10 @@
 
 package org.apache.tez.engine.newapi.impl;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.records.TezTaskAttemptID;
+public interface TezUmbilical {
 
-/**
- * Interface to the RPC layer ( umbilical ) between the Tez AM and
- * a Tez Container's JVM.
- */
-public class TezUmbilical extends AbstractService {
-
-  private static final Log LOG = LogFactory.getLog(TezUmbilical.class);
-
-  private final TezTaskUmbilicalProtocol umbilical;
-  private Thread heartbeatThread;
-  private Thread eventRouterThread;
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-  private long amPollInterval;
-  private final String containerIdStr;
-
-  private TezTaskAttemptID currentTaskAttemptID;
-  private int eventCounter = 0;
-  private int maxEventsToGet = 0;
-  private LinkedList<TezEvent> eventsToSend;
-  private ConcurrentLinkedQueue<TezEvent> eventsToBeProcessed;
-
-  public TezUmbilical(TezTaskUmbilicalProtocol umbilical,
-      String containerIdStr) {
-    super(TezUmbilical.class.getName());
-    this.umbilical = umbilical;
-    this.containerIdStr = containerIdStr;
-    this.eventsToSend = new LinkedList<TezEvent>();
-    this.eventsToBeProcessed = new ConcurrentLinkedQueue<TezEvent>();
-  }
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    amPollInterval = conf.getLong(
-        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
-        TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT);
-    maxEventsToGet = conf.getInt(
-        TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT,
-        TezConfiguration.TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT);
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    startHeartbeatThread();
-    startRouterThread();
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    stopped.set(true);
-    eventRouterThread.interrupt();
-    super.serviceStop();
-  }
-
-  private void startHeartbeatThread() {
-    heartbeatThread = new Thread(new Runnable() {
-      public void run() {
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-          try {
-            Thread.sleep(amPollInterval);
-            try {
-              heartbeat();
-            } catch (TezException e) {
-              LOG.error("Error communicating with AM: " + e.getMessage() , e);
-              // TODO TODONEWTEZ
-            } catch (InvalidToken e) {
-              LOG.error("Error in authencating with AM: ", e);
-              // TODO TODONEWTEZ
-            } catch (Exception e) {
-              LOG.error("Error in heartbeating with AM. ", e);
-              // TODO TODONEWTEZ
-            }
-          } catch (InterruptedException e) {
-            if (!stopped.get()) {
-              LOG.warn("Heartbeat thread interrupted. Returning.");
-            }
-            return;
-          }
-        }
-      }
-    });
-    heartbeatThread.setName("Tez Container Heartbeat Thread ["
-        + containerIdStr + "]");
-    heartbeatThread.start();
-  }
-
-  private void startRouterThread() {
-    eventRouterThread = new Thread(new Runnable() {
-      public void run() {
-        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-          try {
-            TezEvent e = eventsToBeProcessed.poll();
-            if (e == null) {
-              eventsToBeProcessed.wait();
-            }
-            // TODO TODONEWTEZ
-            switch (e.getEventType()) {
-            case DATA_MOVEMENT_EVENT:
-              // redirect to input of current task
-              if (!e.getDestinationInfo().getTaskAttemptID().equals(
-                  currentTaskAttemptID)) {
-                // error? or block?
-              }
-              // route to appropriate input
-              break;
-            case TASK_FAILED_EVENT:
-              // route to ???
-              break;
-            case INPUT_READ_ERROR_EVENT:
-              // invalid event? ignore?
-              break;
-            }
-          } catch (InterruptedException e) {
-            if (!stopped.get()) {
-              LOG.warn("Event Router thread interrupted. Returning.");
-            }
-            return;
-          }
-        }
-      }
-    });
-    eventRouterThread.setName("Tez Container Event Router Thread ["
-        + containerIdStr + "]");
-    eventRouterThread.start();
-  }
-
-  private synchronized void heartbeat() throws TezException, IOException {
-    List<TezEvent> events = new ArrayList<TezEvent>();
-    events.addAll(eventsToSend);
-    TezHeartbeatRequest request = new TezHeartbeatRequest(events,
-        currentTaskAttemptID, eventCounter, maxEventsToGet);
-    TezHeartbeatResponse response = umbilical.heartbeat(request);
-    eventsToSend.clear();
-    eventCounter += response.getEvents().size();
-    eventsToBeProcessed.addAll(response.getEvents());
-    eventsToBeProcessed.notifyAll();
-  }
-
-  /**
-   * Hook to ask the Tez AM for the next task to be run on the Container
-   * @return Next task to be run
-   * @throws IOException
-   */
-  public synchronized ContainerTask getNextTask(
-      ContainerContext containerContext) throws IOException {
-    ContainerTask task = umbilical.getTask(containerContext);
-    if (task.getTaskSpec().getTaskAttemptID() != currentTaskAttemptID) {
-      currentTaskAttemptID = task.getTaskSpec().getTaskAttemptID();
-    }
-    return task;
-  }
-
-  /**
-   * Hook to query the Tez AM whether a particular Task Attempt can commit its
-   * output.
-   * @param attemptID Attempt ID of the Task that is waiting to commit.
-   * attempts can commit.
-   * @throws IOException
-   */
-  public synchronized boolean canCommit(TezTaskAttemptID attemptID)
-      throws IOException {
-    return umbilical.canCommit(attemptID);
-  }
-
-  /**
-   * Inform the Tez AM that an attempt has failed.
-   * @param attemptID Task Attempt ID of the failed attempt.
-   * @param taskFailedEvent Event with details on the attempt failure.
-   * @throws IOException
-   */
-  public synchronized void taskFailed(TezTaskAttemptID attemptID,
-      TezEvent taskFailedEvent) throws IOException {
-    umbilical.taskFailed(attemptID, taskFailedEvent);
-  }
+  public void addEvents(Collection<TezEvent> events);
 
-  public synchronized void addEvents(Collection<TezEvent> events) {
-    eventsToSend.addAll(events);
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
index 7b0eb45..f077831 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newruntime/LogicalIOProcessorRuntimeTask.java
@@ -44,9 +44,11 @@ import org.apache.tez.engine.newapi.TezProcessorContext;
 import org.apache.tez.engine.newapi.impl.InputSpec;
 import org.apache.tez.engine.newapi.impl.OutputSpec;
 import org.apache.tez.engine.newapi.impl.TaskSpec;
+import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezInputContextImpl;
 import org.apache.tez.engine.newapi.impl.TezOutputContextImpl;
 import org.apache.tez.engine.newapi.impl.TezProcessorContextImpl;
+import org.apache.tez.engine.newapi.impl.TezUmbilical;
 
 import com.google.common.base.Preconditions;
 
@@ -62,6 +64,7 @@ public class LogicalIOProcessorRuntimeTask {
 
   private final TaskSpec taskSpec;
   private final Configuration tezConf;
+  private final TezUmbilical tezUmbilical;
 
   private final List<InputSpec> inputSpecs;
   private final List<LogicalInput> inputs;
@@ -85,11 +88,13 @@ public class LogicalIOProcessorRuntimeTask {
   private Map<String, List<Event>> closeInputEventMap;
   private Map<String, List<Event>> closeOutputEventMap;
 
-  public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, Configuration tezConf) {
+  public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, Configuration tezConf,
+      TezUmbilical tezUmbilical) {
     LOG.info("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: "
         + taskSpec);
     this.taskSpec = taskSpec;
     this.tezConf = tezConf;
+    this.tezUmbilical = tezUmbilical;
     this.inputSpecs = taskSpec.getInputs();
     this.inputs = createInputs(inputSpecs);
     this.outputSpecs = taskSpec.getOutputs();
@@ -215,7 +220,7 @@ public class LogicalIOProcessorRuntimeTask {
 
   private TezInputContext createInputContext(InputSpec inputSpec) {
     TezInputContext inputContext = new TezInputContextImpl(tezConf,
-        taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
+        tezUmbilical, taskSpec.getVertexName(), inputSpec.getSourceVertexName(),
         taskSpec.getTaskAttemptID(), tezCounters,
         inputSpec.getInputDescriptor().getUserPayload());
     return inputContext;
@@ -223,7 +228,8 @@ public class LogicalIOProcessorRuntimeTask {
 
   private TezOutputContext createOutputContext(OutputSpec outputSpec) {
     TezOutputContext outputContext = new TezOutputContextImpl(tezConf,
-        taskSpec.getVertexName(), outputSpec.getDestinationVertexName(),
+        tezUmbilical, taskSpec.getVertexName(),
+        outputSpec.getDestinationVertexName(),
         taskSpec.getTaskAttemptID(), tezCounters,
         outputSpec.getOutputDescriptor().getUserPayload());
     return outputContext;
@@ -231,8 +237,8 @@ public class LogicalIOProcessorRuntimeTask {
 
   private TezProcessorContext createProcessorContext() {
     TezProcessorContext processorContext = new TezProcessorContextImpl(tezConf,
-        taskSpec.getVertexName(), taskSpec.getTaskAttemptID(), tezCounters,
-        processorDescriptor.getUserPayload());
+        tezUmbilical, taskSpec.getVertexName(), taskSpec.getTaskAttemptID(),
+        tezCounters, processorDescriptor.getUserPayload());
     return processorContext;
   }
 
@@ -245,9 +251,9 @@ public class LogicalIOProcessorRuntimeTask {
       if (input instanceof LogicalInput) {
         inputs.add((LogicalInput) input);
       } else {
-        throw new TezUncheckedException(
-            input.getClass().getName()
-                + " is not a sub-type of LogicalInput. Only LogicalInput sub-types supported by a LogicalIOProcessor.");
+        throw new TezUncheckedException(input.getClass().getName()
+            + " is not a sub-type of LogicalInput."
+            + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
       }
 
     }
@@ -263,9 +269,9 @@ public class LogicalIOProcessorRuntimeTask {
       if (output instanceof LogicalOutput) {
         outputs.add((LogicalOutput) output);
       } else {
-        throw new TezUncheckedException(
-            output.getClass().getName()
-                + " is not a sub-type of LogicalOutput. Only LogicalOutput sub-types supported by a LogicalIOProcessor.");
+        throw new TezUncheckedException(output.getClass().getName()
+            + " is not a sub-type of LogicalOutput."
+            + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
       }
     }
     return outputs;
@@ -276,10 +282,14 @@ public class LogicalIOProcessorRuntimeTask {
     Processor processor = RuntimeUtils.createClazzInstance(processorDescriptor
         .getClassName());
     if (!(processor instanceof LogicalIOProcessor)) {
-      throw new TezUncheckedException(
-          processor.getClass().getName()
-              + " is not a sub-type of LogicalIOProcessor. Only LogicalIOProcessor sub-types supported at the moment");
+      throw new TezUncheckedException(processor.getClass().getName()
+          + " is not a sub-type of LogicalIOProcessor."
+          + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
     }
     return (LogicalIOProcessor) processor;
   }
+
+  public void handleEvent(TezEvent e) {
+    // TODO TODONEWTEZ
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-engine/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/proto/Events.proto b/tez-engine/src/main/proto/Events.proto
index fe6e449..fa9cb2c 100644
--- a/tez-engine/src/main/proto/Events.proto
+++ b/tez-engine/src/main/proto/Events.proto
@@ -20,6 +20,9 @@ option java_package = "org.apache.tez.engine.api.events";
 option java_outer_classname = "SystemEventProtos";
 option java_generate_equals_and_hash = true;
 
-message TaskFailedEventProto {
+message TaskAttemptFailedEventProto {
   optional string diagnostics = 1;
 }
+
+message TaskAttemptCompletedEventProto {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 16cc8db..204f517 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -682,9 +682,17 @@ public class LocalJobRunnerTez implements ClientProtocol {
       return null;
     }
 
+
     @Override
-    public void taskFailed(TezTaskAttemptID attemptID, TezEvent taskFailedEvent)
-        throws IOException {
+    public void taskAttemptFailed(TezTaskAttemptID attemptID,
+        TezEvent taskFailedEvent) throws IOException {
+      // TODO Auto-generated method stub
+      // TODO TODONEWTEZ
+    }
+
+    @Override
+    public void taskAttemptCompleted(TezTaskAttemptID attemptID,
+        TezEvent taskAttemptCompletedEvent) throws IOException {
       // TODO Auto-generated method stub
       // TODO TODONEWTEZ
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/ee47464d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
index da68776..1a40ead 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
@@ -156,8 +156,15 @@ public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
   }
 
   @Override
-  public void taskFailed(TezTaskAttemptID attemptID, TezEvent taskFailedEvent)
-      throws IOException {
+  public void taskAttemptFailed(TezTaskAttemptID attemptID,
+      TezEvent taskFailedEvent) throws IOException {
+    // TODO Auto-generated method stub
+    // TODO TODONEWTEZ
+  }
+
+  @Override
+  public void taskAttemptCompleted(TezTaskAttemptID attemptID,
+      TezEvent taskAttemptCompletedEvent) throws IOException {
     // TODO Auto-generated method stub
     // TODO TODONEWTEZ
   }