You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/09/09 06:11:37 UTC

[2/2] git commit: TEZ-423. Move new re-factor classes to appropriate module. (hitesh)

TEZ-423. Move new re-factor classes to appropriate module. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1cc83e45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1cc83e45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1cc83e45

Branch: refs/heads/TEZ-398
Commit: 1cc83e457b8a86aab801b526c79652fa99e83186
Parents: bc9cee3
Author: Hitesh Shah <hi...@apache.org>
Authored: Sun Sep 8 21:09:45 2013 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Sun Sep 8 21:09:45 2013 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/ContainerTask.java    | 102 ---------
 .../apache/tez/dag/api/TezConfiguration.java    |   8 +
 tez-dag/pom.xml                                 |   4 -
 .../apache/hadoop/mapred/YarnTezDagChild.java   |  61 ++---
 .../dag/app/TaskAttemptListenerImpTezDag.java   |  43 +++-
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  18 +-
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  20 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  16 +-
 .../app/rm/AMSchedulerEventTALaunchRequest.java |  13 +-
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   2 +-
 .../rm/container/AMContainerEventAssignTA.java  |  22 +-
 .../dag/app/rm/container/AMContainerImpl.java   |  12 +-
 .../dag/app/rm/container/AMContainerTask.java   |   8 +-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   |   7 +-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  44 ++--
 .../tez/dag/app/rm/TestContainerReuse.java      |  64 +++---
 .../dag/app/rm/container/TestAMContainer.java   |   4 +-
 tez-engine-api/pom.xml                          |  31 +++
 .../engine/newapi/events/DataMovementEvent.java | 109 +++++++++
 .../newapi/events/InputDataErrorEvent.java      |  15 +-
 .../newapi/events/TaskCommunicationEvent.java   |  84 -------
 .../tez/engine/newapi/impl/EventMetaData.java   |  81 -------
 .../tez/engine/newapi/impl/TezUmbilical.java    |  61 -----
 .../tez/engine/newapi/rpc/impl/InputSpec.java   |  51 -----
 .../tez/engine/newapi/rpc/impl/OutputSpec.java  |  48 ----
 .../tez/engine/newapi/rpc/impl/TaskSpec.java    |  66 ------
 .../tez/engine/newapi/rpc/impl/TezEvent.java    |  64 ------
 tez-engine-api/src/main/proto/Events.proto      |  34 +++
 tez-engine/pom.xml                              |  31 +++
 .../org/apache/tez/common/ContainerTask.java    |  74 +++++++
 .../tez/common/TezTaskUmbilicalProtocol.java    |  40 ++--
 .../engine/newapi/events/TaskFailedEvent.java   |  35 +++
 .../tez/engine/newapi/impl/EventMetaData.java   | 120 ++++++++++
 .../tez/engine/newapi/impl/EventType.java       |  25 +++
 .../tez/engine/newapi/impl/InputSpec.java       |  81 +++++++
 .../tez/engine/newapi/impl/OutputSpec.java      |  81 +++++++
 .../apache/tez/engine/newapi/impl/TaskSpec.java | 139 ++++++++++++
 .../apache/tez/engine/newapi/impl/TezEvent.java | 182 +++++++++++++++
 .../engine/newapi/impl/TezHeartbeatRequest.java | 102 +++++++++
 .../newapi/impl/TezHeartbeatResponse.java       |  37 ++++
 .../tez/engine/newapi/impl/TezUmbilical.java    | 220 +++++++++++++++++++
 .../LogicalIOProcessorRuntimeTask.java          |  18 +-
 tez-engine/src/main/proto/Events.proto          |  25 +++
 .../apache/hadoop/mapred/LocalJobRunnerTez.java |  17 ++
 .../tez/mapreduce/TestUmbilicalProtocol.java    |  25 ++-
 45 files changed, 1608 insertions(+), 736 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java
deleted file mode 100644
index 69bd2b4..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/ContainerTask.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-
-public class ContainerTask implements Writable {
-
-  TezTaskContext tezTaskContext;
-  boolean shouldDie;
-
-  public ContainerTask() {
-  }
-
-  public ContainerTask(TezTaskContext tezTaskContext, boolean shouldDie) {
-    this.tezTaskContext = tezTaskContext;
-    this.shouldDie = shouldDie;
-  }
-
-  public TezTaskContext getTezEngineTaskContext() {
-    return tezTaskContext;
-  }
-
-  public boolean shouldDie() {
-    return shouldDie;
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeBoolean(shouldDie);
-    if (tezTaskContext != null) {
-      out.writeBoolean(true);
-      Text.writeString(out, tezTaskContext.getClass().getName());
-      tezTaskContext.write(out);
-    } else {
-      out.writeBoolean(false);
-    }
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    shouldDie = in.readBoolean();
-    boolean taskComing = in.readBoolean();
-    if (taskComing) {
-      String contextClass = Text.readString(in);
-      tezTaskContext = createEmptyContext(contextClass);
-      tezTaskContext.readFields(in);
-    }
-  }
-
-  private TezTaskContext createEmptyContext(String contextClassName)
-      throws IOException {
-    try {
-      Class<?> clazz = Class.forName(contextClassName);
-      Constructor<?> c = clazz.getConstructor(null);
-      c.setAccessible(true);
-      return (TezTaskContext) c.newInstance(null);
-    } catch (ClassNotFoundException e) {
-      throw new IOException(e);
-    } catch (SecurityException e) {
-      throw new IOException(e);
-    } catch (NoSuchMethodException e) {
-      throw new IOException(e);
-    } catch (IllegalArgumentException e) {
-      throw new IOException(e);
-    } catch (InstantiationException e) {
-      throw new IOException(e);
-    } catch (IllegalAccessException e) {
-      throw new IOException(e);
-    } catch (InvocationTargetException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "shouldDie: " + shouldDie + ", tezEngineTaskContext: "
-        + tezTaskContext;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index cc41856..e40f4f5 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -164,6 +164,14 @@ public class TezConfiguration extends Configuration {
       + "get-task.sleep.interval-ms.max";
   public static final int TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT = 500;
 
+  public static final String TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS = TEZ_TASK_PREFIX
+      + "am.heartbeat.interval-ms.max";
+  public static final int TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT = 1000;
+
+  public static final String TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT = TEZ_TASK_PREFIX
+      + "max-events-per-heartbeat.max";
+  public static final int TEZ_TASK_MAX_EVENTS_PER_HEARTBEAT_DEFAULT = 100;
+
   /**
    * Configuration to specify whether container should be reused.
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index f9ec145..875a196 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -101,10 +101,6 @@
       <groupId>com.google.inject</groupId>
       <artifactId>guice</artifactId>
     </dependency>
-    <dependency>
-     <groupId>com.google.protobuf</groupId>
-     <artifactId>protobuf-java</artifactId>
-    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
index 9f59219..2bac9b6 100644
--- a/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
+++ b/tez-dag/src/main/java/org/apache/hadoop/mapred/YarnTezDagChild.java
@@ -49,13 +49,12 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.tez.common.ContainerContext;
 import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezTaskUmbilicalProtocol;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.Limits;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -65,7 +64,9 @@ import org.apache.tez.engine.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistryModule;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.TokenCache;
-import org.apache.tez.engine.runtime.RuntimeUtils;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
 import org.apache.tez.engine.task.RuntimeTask;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -147,11 +148,12 @@ public class YarnTezDagChild {
     if (LOG.isDebugEnabled()) {
       LOG.debug("PID, containerId: " + pid + ", " + containerIdentifier);
     }
-    TezEngineTaskContext taskContext = null;
+    TaskSpec taskSpec = null;
     ContainerTask containerTask = null;
     UserGroupInformation childUGI = null;
     TezTaskAttemptID taskAttemptId = null;
-    ContainerContext containerContext = new ContainerContext(containerIdentifier, pid);
+    ContainerContext containerContext = new ContainerContext(
+        containerIdentifier, pid);
     int getTaskMaxSleepTime = defaultConf.getInt(
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX,
         TezConfiguration.TEZ_TASK_GET_TASK_SLEEP_INTERVAL_MS_MAX_DEFAULT);
@@ -173,19 +175,18 @@ public class YarnTezDagChild {
         LOG.info("TaskInfo: shouldDie: "
             + containerTask.shouldDie()
             + (containerTask.shouldDie() == true ? "" : ", taskAttemptId: "
-                + containerTask.getTezEngineTaskContext().getTaskAttemptId()));
+                + containerTask.getTaskSpec().getTaskAttemptID()));
 
         if (containerTask.shouldDie()) {
           return;
         }
         taskCount++;
-        taskContext = (TezEngineTaskContext) containerTask
-            .getTezEngineTaskContext();
+        taskSpec = containerTask.getTaskSpec();
         if (LOG.isDebugEnabled()) {
           LOG.debug("New container task context:"
-              + taskContext.toString());
+              + taskSpec.toString());
         }
-        taskAttemptId = taskContext.getTaskAttemptId();
+        taskAttemptId = taskSpec.getTaskAttemptID();
         TezVertexID newVertexId = taskAttemptId.getTaskID().getVertexID();
 
         if (currentVertexId != null) {
@@ -200,7 +201,7 @@ public class YarnTezDagChild {
 
         updateLoggers(taskAttemptId);
 
-        final Task t = createAndConfigureTezTask(taskContext, umbilical,
+        final Task t = createAndConfigureTezTask(taskSpec, umbilical,
             credentials, jobToken, attemptNumber);
 
         final Configuration conf = ((RuntimeTask)t).getConfiguration();
@@ -297,7 +298,7 @@ public class YarnTezDagChild {
   }
 
   private static Task createAndConfigureTezTask(
-      TezEngineTaskContext taskContext, TezTaskUmbilicalProtocol master,
+      TaskSpec taskSpec, TezTaskUmbilicalProtocol master,
       Credentials cxredentials, Token<JobTokenIdentifier> jobToken,
       int appAttemptId) throws IOException, InterruptedException {
 
@@ -308,19 +309,29 @@ public class YarnTezDagChild {
 
     configureLocalDirs(conf);
 
-
     // FIXME need Input/Output vertices else we have this hack
-    if (taskContext.getInputSpecList().isEmpty()) {
-      taskContext.getInputSpecList().add(
-          new InputSpec("null", 0, SimpleInput.class.getName()));
+    if (taskSpec.getInputs().isEmpty()) {
+      InputDescriptor simpleInputDesc =
+          new InputDescriptor(SimpleInput.class.getName());
+      simpleInputDesc.setUserPayload(
+          taskSpec.getProcessorDescriptor().getUserPayload());
+      taskSpec.getInputs().add(
+          new InputSpec("null", simpleInputDesc, 0));
     }
-    if (taskContext.getOutputSpecList().isEmpty()) {
-      taskContext.getOutputSpecList().add(
-          new OutputSpec("null", 0, SimpleOutput.class.getName()));
+    if (taskSpec.getOutputs().isEmpty()) {
+      OutputDescriptor simpleOutputDesc =
+          new OutputDescriptor(SimpleOutput.class.getName());
+      simpleOutputDesc.setUserPayload(
+          taskSpec.getProcessorDescriptor().getUserPayload());
+      taskSpec.getOutputs().add(
+          new OutputSpec("null", simpleOutputDesc, 0));
     }
-    Task t = RuntimeUtils.createRuntimeTask(taskContext);
-    
-    t.initialize(conf, taskContext.getProcessorUserPayload(), master);
+    Task t = null;
+
+    // FIXME TODONEWTEZ
+
+    // RuntimeUtils.createRuntimeTask(taskSpec);
+    // t.initialize(conf, taskSpec.getProcessorUserPayload(), master);
 
     // FIXME wrapper should initialize all of processor, inputs and outputs
     // Currently, processor is inited via task init
@@ -353,13 +364,13 @@ public class YarnTezDagChild {
       }
     }
   }
-  
+
   private static void updateLoggers(TezTaskAttemptID tezTaskAttemptID)
       throws FileNotFoundException {
     String containerLogDir = null;
 
     LOG.info("Redirecting log files based on TaskAttemptId: " + tezTaskAttemptID);
-    
+
     Appender appender = Logger.getRootLogger().getAppender(
         TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
     if (appender != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 619a494..6d64a58 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -58,6 +59,10 @@ import org.apache.tez.dag.app.rm.container.AMContainerTask;
 import org.apache.tez.dag.app.security.authorize.MRAMPolicyProvider;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
+import org.apache.tez.engine.newapi.events.TaskFailedEvent;
+import org.apache.tez.engine.newapi.impl.TezEvent;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
+import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.records.OutputContext;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
@@ -210,7 +215,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
               + " is invalid and will be killed");
         else
           LOG.info("Container with id: " + containerId
-              + " is valid and will be killed");              
+              + " is valid and will be killed");
         task = TASK_FOR_INVALID_JVM;
       } else {
         pingContainerHeartbeatHandler(containerId);
@@ -224,16 +229,16 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
             LOG.info("No task currently assigned to Container with id: "
                 + containerId);
           } else {
-            registerTaskAttempt(taskContext.getTask().getTaskAttemptId(),
+            registerTaskAttempt(taskContext.getTask().getTaskAttemptID(),
                 containerId);
             task = new ContainerTask(taskContext.getTask(), false);
             context.getEventHandler().handle(
                 new TaskAttemptEventStartedRemotely(taskContext.getTask()
-                    .getTaskAttemptId(), containerId, context
+                    .getTaskAttemptID(), containerId, context
                     .getApplicationACLs(), context.getAllContainers()
                     .get(containerId).getShufflePort()));
             LOG.info("Container with id: " + containerId + " given task: "
-                + taskContext.getTask().getTaskAttemptId());
+                + taskContext.getTask().getTaskAttemptID());
           }
         }
       }
@@ -474,10 +479,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
     // between polls (MRTask) implies tasks end up wasting upto 1 second doing
     // nothing. Similarly for CA_COMMIT.
 
-    DAG job = context.getCurrentDAG();
-    Task task =
-        job.getVertex(taskAttemptId.getTaskID().getVertexID()).
-            getTask(taskAttemptId.getTaskID());
 
     // TODO In-Memory Shuffle
     /*
@@ -563,4 +564,30 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
           + ", ContainerId not known for this attempt");
     }
   }
+
+  @Override
+  public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
+      throws IOException, TezException {
+    // TODO TODONEWTEZ Auto-generated method stub
+    TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID();
+    LOG.info("Ping from " + taskAttemptID.toString());
+    taskHeartbeatHandler.pinged(taskAttemptID);
+    pingContainerHeartbeatHandler(taskAttemptID);
+    return null;
+  }
+
+  @Override
+  public void taskFailed(TezTaskAttemptID taskAttemptId,
+      TezEvent tezEvent) throws IOException {
+    TaskFailedEvent taskFailedEvent = (TaskFailedEvent) tezEvent.getEvent();
+    LOG.fatal("Task: " + taskAttemptId + " - failed : "
+        + taskFailedEvent.getDiagnostics());
+    reportDiagnosticInfo(taskAttemptId, "Error: "
+        + taskFailedEvent.getDiagnostics());
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(taskAttemptId, TaskAttemptEventType.TA_FAILED));
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index a33ab91..48c9993 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -34,23 +32,25 @@ import org.apache.tez.dag.api.client.VertexStatusBuilder;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 
 /**
- * Main interface to interact with the job. Provides only getters. 
+ * Main interface to interact with the job. Provides only getters.
  */
 public interface Vertex extends Comparable<Vertex> {
 
   TezVertexID getVertexId();
   public VertexPlan getVertexPlan();
-  
+
   int getDistanceFromRoot();
   String getName();
   VertexState getState();
 
   /**
-   * Get all the counters of this vertex. 
+   * Get all the counters of this vertex.
    * @return aggregate task-counters
    */
   TezCounters getAllCounters();
@@ -64,18 +64,18 @@ public interface Vertex extends Comparable<Vertex> {
   float getProgress();
   ProgressBuilder getVertexProgress();
   VertexStatusBuilder getVertexStatus();
-  
+
   void setParallelism(int parallelism, List<byte[]> taskUserPayloads);
-  
+
   TezDependentTaskCompletionEvent[] getTaskAttemptCompletionEvents(
       TezTaskAttemptID attemptId, int fromEventId, int maxEvents);
-  
+
   void setInputVertices(Map<Vertex, EdgeProperty> inVertices);
   void setOutputVertices(Map<Vertex, EdgeProperty> outVertices);
 
   Map<Vertex, EdgeProperty> getInputVertices();
   Map<Vertex, EdgeProperty> getOutputVertices();
-  
+
   List<InputSpec> getInputSpecList();
   List<OutputSpec> getOutputSpecList();
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index ae3c05f..00ef9e5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -47,8 +47,6 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.ProcessorDescriptor;
@@ -94,6 +92,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.dag.utils.TezBuilderUtils;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -294,14 +293,13 @@ public class TaskAttemptImpl implements TaskAttempt,
     return getVertexID().getDAGId();
   }
 
-  TezTaskContext createRemoteTask() {
+  TaskSpec createRemoteTaskSpec() {
     Vertex vertex = getVertex();
     ProcessorDescriptor procDesc = vertex.getProcessorDescriptor();
     DAG dag = vertex.getDAG();
-
-    return new TezEngineTaskContext(getID(), dag.getUserName(),
-        dag.getName(), vertex.getName(), procDesc,
-        vertex.getInputSpecList(), vertex.getOutputSpecList());
+    return new TaskSpec(getID(), dag.getUserName(),
+        vertex.getName(), procDesc, vertex.getInputSpecList(),
+        vertex.getOutputSpecList());
   }
 
   @Override
@@ -507,7 +505,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         .getVertex(attemptId.getTaskID().getVertexID())
         .getTask(attemptId.getTaskID());
   }
-  
+
   Vertex getVertex() {
     return appContext.getCurrentDAG()
         .getVertex(attemptId.getTaskID().getVertexID());
@@ -856,7 +854,7 @@ public class TaskAttemptImpl implements TaskAttempt,
       // recovery.
 
       // Create the remote task.
-      TezTaskContext remoteTaskContext = ta.createRemoteTask();
+      TaskSpec remoteTaskSpec = ta.createRemoteTaskSpec();
       // Create startTaskRequest
 
       String[] requestHosts = new String[0];
@@ -889,12 +887,12 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       if (LOG.isDebugEnabled()) {
         LOG.debug("Asking for container launch with taskAttemptContext: "
-            + remoteTaskContext);
+            + remoteTaskSpec);
       }
       // Send out a launch request to the scheduler.
 
       AMSchedulerEventTALaunchRequest launchRequestEvent = new AMSchedulerEventTALaunchRequest(
-          ta.attemptId, ta.taskResource, remoteTaskContext, ta, requestHosts,
+          ta.attemptId, ta.taskResource, remoteTaskSpec, ta, requestHosts,
           requestRacks, scheduleEvent.getPriority(), ta.containerContext);
       ta.sendEvent(launchRequestEvent);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index b20ac2a..a040ff2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -50,8 +50,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -103,6 +101,8 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -656,7 +656,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       readLock.unlock();
     }
   }
-  
+
   @Override
   public void setParallelism(int parallelism, List<byte[]> taskUserPayloads) {
     writeLock.lock();
@@ -1413,7 +1413,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   public int getOutputVerticesCount() {
     return this.targetVertices.size();
   }
-  
+
   @Override
   public ProcessorDescriptor getProcessorDescriptor() {
     return processorDescriptor;
@@ -1453,8 +1453,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         this.getInputVerticesCount());
     for (Entry<Vertex, EdgeProperty> entry : this.getInputVertices().entrySet()) {
       InputSpec inputSpec = new InputSpec(entry.getKey().getName(),
-          entry.getKey().getTotalTasks(),
-          entry.getValue().getEdgeDestination().getClassName());
+          entry.getValue().getEdgeDestination(), entry.getKey().getTotalTasks());
       if (LOG.isDebugEnabled()) {
         LOG.debug("For vertex : " + this.getName()
             + ", Using InputSpec : " + inputSpec);
@@ -1472,8 +1471,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
       outputSpecList = new ArrayList<OutputSpec>(this.getOutputVerticesCount());
       for (Entry<Vertex, EdgeProperty> entry : this.getOutputVertices().entrySet()) {
         OutputSpec outputSpec = new OutputSpec(entry.getKey().getName(),
-            entry.getKey().getTotalTasks(),
-            entry.getValue().getEdgeSource().getClassName());
+            entry.getValue().getEdgeSource(), entry.getKey().getTotalTasks());
         if (LOG.isDebugEnabled()) {
           LOG.debug("For vertex : " + this.getName()
               + ", Using OutputSpec : " + outputSpec);
@@ -1500,7 +1498,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
   VertexScheduler getVertexScheduler() {
     return this.vertexScheduler;
   }
-  
+
   private static void logLocationHints(VertexLocationHint locationHint) {
     Multiset<String> hosts = HashMultiset.create();
     Multiset<String> racks = HashMultiset.create();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
index f66c1a2..1c30b0b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTALaunchRequest.java
@@ -19,10 +19,10 @@ package org.apache.tez.dag.app.rm;
 
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
 
 public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
 
@@ -35,19 +35,18 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
   private final Priority priority;
   private final Resource capability;
   private final ContainerContext containerContext;
-  
 
-  private final TezTaskContext remoteTaskContext;
+  private final TaskSpec remoteTaskSpec;
   private final TaskAttempt taskAttempt;
 
   public AMSchedulerEventTALaunchRequest(TezTaskAttemptID attemptId,
       Resource capability,
-      TezTaskContext remoteTaskContext, TaskAttempt ta,
+      TaskSpec remoteTaskSpec, TaskAttempt ta,
       String[] hosts, String[] racks, Priority priority, ContainerContext containerContext) {
     super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
     this.attemptId = attemptId;
     this.capability = capability;
-    this.remoteTaskContext = remoteTaskContext;
+    this.remoteTaskSpec = remoteTaskSpec;
     this.taskAttempt = ta;
     this.hosts = hosts;
     this.racks = racks;
@@ -75,8 +74,8 @@ public class AMSchedulerEventTALaunchRequest extends AMSchedulerEvent {
     return priority;
   }
 
-  public TezTaskContext getRemoteTaskContext() {
-    return remoteTaskContext;
+  public TaskSpec getRemoteTaskSpec() {
+    return remoteTaskSpec;
   }
 
   public TaskAttempt getTaskAttempt() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 1cb6de6..9e059a9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -400,7 +400,7 @@ public class TaskSchedulerEventHandler extends AbstractService
     }
     sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
     sendEvent(new AMContainerEventAssignTA(containerId,
-        taskAttempt.getID(), event.getRemoteTaskContext()));
+        taskAttempt.getID(), event.getRemoteTaskSpec()));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
index 0fe5b3b..dd178fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventAssignTA.java
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -18,26 +18,26 @@
 package org.apache.tez.dag.app.rm.container;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
 
 public class AMContainerEventAssignTA extends AMContainerEvent {
 
   private final TezTaskAttemptID attemptId;
   // TODO Maybe have tht TAL pull the remoteTask from the TaskAttempt itself ?
-  private final TezTaskContext remoteTaskContext;
-  
+  private final TaskSpec remoteTaskSpec;
+
   public AMContainerEventAssignTA(ContainerId containerId,
-      TezTaskAttemptID attemptId, Object remoteTaskContext) {
+      TezTaskAttemptID attemptId, Object remoteTaskSpec) {
     super(containerId, AMContainerEventType.C_ASSIGN_TA);
     this.attemptId = attemptId;
-    this.remoteTaskContext = (TezTaskContext)remoteTaskContext;
+    this.remoteTaskSpec = (TaskSpec)remoteTaskSpec;
   }
-  
-  public TezTaskContext getRemoteTaskContext() {
-    return this.remoteTaskContext;
+
+  public TaskSpec getRemoteTaskSpec() {
+    return this.remoteTaskSpec;
   }
-  
+
   public TezTaskAttemptID getTaskAttemptId() {
     return this.attemptId;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9d44c11..afb0ed5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
-import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ContainerContext;
@@ -55,6 +54,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
 import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 //import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
 
 @SuppressWarnings("rawtypes")
 public class AMContainerImpl implements AMContainer {
@@ -74,8 +74,8 @@ public class AMContainerImpl implements AMContainer {
   private final List<TezTaskAttemptID> completedAttempts = new LinkedList<TezTaskAttemptID>();
 
   // TODO Maybe this should be pulled from the TaskAttempt.s
-  private final Map<TezTaskAttemptID, TezTaskContext> remoteTaskMap =
-      new HashMap<TezTaskAttemptID, TezTaskContext>();
+  private final Map<TezTaskAttemptID, TaskSpec> remoteTaskMap =
+      new HashMap<TezTaskAttemptID, TaskSpec>();
 
   // TODO ?? Convert to list and hash.
 
@@ -453,10 +453,10 @@ public class AMContainerImpl implements AMContainer {
       }
       container.pendingAttempt = event.getTaskAttemptId();
       if (LOG.isDebugEnabled()) {
-        LOG.debug("AssignTA: attempt: " + event.getRemoteTaskContext());
+        LOG.debug("AssignTA: attempt: " + event.getRemoteTaskSpec());
       }
       container.remoteTaskMap
-          .put(event.getTaskAttemptId(), event.getRemoteTaskContext());
+          .put(event.getTaskAttemptId(), event.getRemoteTaskSpec());
       return container.getState();
     }
   }
@@ -600,7 +600,7 @@ public class AMContainerImpl implements AMContainer {
         AMContainerImpl container, AMContainerEvent cEvent) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("AssignTAAtIdle: attempt: " +
-            ((AMContainerEventAssignTA) cEvent).getRemoteTaskContext());
+            ((AMContainerEventAssignTA) cEvent).getRemoteTaskSpec());
       }
       return super.transition(container, cEvent);
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
index 1189a34..be1c08e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerTask.java
@@ -18,13 +18,13 @@
 
 package org.apache.tez.dag.app.rm.container;
 
-import org.apache.tez.common.TezTaskContext;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
 
 public class AMContainerTask {
   private final boolean shouldDie;
-  private final TezTaskContext tezTask;
+  private final TaskSpec tezTask;
 
-  public AMContainerTask(boolean shouldDie, TezTaskContext tezTask) {
+  public AMContainerTask(boolean shouldDie, TaskSpec tezTask) {
     this.shouldDie = shouldDie;
     this.tezTask = tezTask;
   }
@@ -33,7 +33,7 @@ public class AMContainerTask {
     return this.shouldDie;
   }
 
-  public TezTaskContext getTask() {
+  public TaskSpec getTask() {
     return this.tezTask;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 3274a4a..fe97e10 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -83,6 +83,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -175,7 +176,7 @@ public class TestTaskAttempt {
         new SystemClock(), mock(TaskHeartbeatHandler.class),
         mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
             1), createFakeContainerContext());
-        
+
     TaskAttemptImpl spyTa = spy(taImpl);
     when(spyTa.resolveHosts(hosts)).thenReturn(
         resolved.toArray(new String[3]));
@@ -766,7 +767,7 @@ public class TestTaskAttempt {
     }
 
     @Override
-    protected TezTaskContext createRemoteTask() {
+    protected TaskSpec createRemoteTaskSpec() {
       // FIXME
       return null;
     }
@@ -786,7 +787,7 @@ public class TestTaskAttempt {
         TaskAttemptState state) {
     }
   }
-  
+
   private static ContainerContext createFakeContainerContext() {
     return new ContainerContext(new HashMap<String, LocalResource>(),
         new Credentials(), new HashMap<String, String>(), "");

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 238b2bd..7e7a8a5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -613,38 +613,38 @@ public class TestVertexImpl {
     Assert.assertEquals(2, v3.getOutputVerticesCount());
 
     Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(0)
-        .getVertexName())
+        .getSourceVertexName())
         || "vertex2".equals(v3.getInputSpecList().get(0)
-            .getVertexName()));
+            .getSourceVertexName()));
     Assert.assertTrue("vertex1".equals(v3.getInputSpecList().get(1)
-        .getVertexName())
+        .getSourceVertexName())
         || "vertex2".equals(v3.getInputSpecList().get(1)
-            .getVertexName()));
+            .getSourceVertexName()));
     Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(0)
-        .getInputClassName())
+        .getInputDescriptor().getClassName())
         || "i3_v2".equals(v3.getInputSpecList().get(0)
-            .getInputClassName()));
+            .getInputDescriptor().getClassName()));
     Assert.assertTrue("i3_v1".equals(v3.getInputSpecList().get(1)
-        .getInputClassName())
+        .getInputDescriptor().getClassName())
         || "i3_v2".equals(v3.getInputSpecList().get(1)
-            .getInputClassName()));
+            .getInputDescriptor().getClassName()));
 
     Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(0)
-        .getVertexName())
+        .getDestinationVertexName())
         || "vertex5".equals(v3.getOutputSpecList().get(0)
-            .getVertexName()));
+            .getDestinationVertexName()));
     Assert.assertTrue("vertex4".equals(v3.getOutputSpecList().get(1)
-        .getVertexName())
+        .getDestinationVertexName())
         || "vertex5".equals(v3.getOutputSpecList().get(1)
-            .getVertexName()));
+            .getDestinationVertexName()));
     Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(0)
-        .getOutputClassName())
+        .getOutputDescriptor().getClassName())
         || "o3_v5".equals(v3.getOutputSpecList().get(0)
-            .getOutputClassName()));
+            .getOutputDescriptor().getClassName()));
     Assert.assertTrue("o3_v4".equals(v3.getOutputSpecList().get(1)
-        .getOutputClassName())
+        .getOutputDescriptor().getClassName())
         || "o3_v5".equals(v3.getOutputSpecList().get(1)
-            .getOutputClassName()));
+            .getOutputDescriptor().getClassName()));
   }
 
   @Test(timeout = 5000)
@@ -654,7 +654,7 @@ public class TestVertexImpl {
     VertexImpl v = vertices.get("vertex2");
     startVertex(v);
   }
-  
+
   @Test//(timeout = 5000)
   public void testVertexSetParallelism() {
     VertexImpl v2 = vertices.get("vertex2");
@@ -663,9 +663,9 @@ public class TestVertexImpl {
     Map<TezTaskID, Task> tasks = v2.getTasks();
     Assert.assertEquals(2, tasks.size());
     TezTaskID firstTask = tasks.keySet().iterator().next();
-    
+
     startVertex(v2);
-    
+
     byte[] payload = new byte[0];
     List<byte[]> taskPayloads = Collections.singletonList(payload);
     v2.setParallelism(1, taskPayloads);
@@ -673,7 +673,7 @@ public class TestVertexImpl {
     Assert.assertEquals(1, tasks.size());
     // the last one is removed
     Assert.assertTrue(tasks.keySet().iterator().next().equals(firstTask));
-    
+
     VertexImpl v1 = vertices.get("vertex1");
     TezTaskID t1_v1 = new TezTaskID(v1.getVertexId(), 0);
     TezTaskAttemptID ta1_t1_v1 = new TezTaskAttemptID(t1_v1, 0);
@@ -681,7 +681,7 @@ public class TestVertexImpl {
     TezDependentTaskCompletionEvent cEvt1 =
         new TezDependentTaskCompletionEvent(1, ta1_t1_v1,
             Status.SUCCEEDED, "", 3, 0);
-    v2.handle( 
+    v2.handle(
         new VertexEventSourceTaskAttemptCompleted(v2.getVertexId(), cEvt1));
 
     TezTaskID t1_v2 = new TezTaskID(v2.getVertexId(), 0);
@@ -689,7 +689,7 @@ public class TestVertexImpl {
     TezDependentTaskCompletionEvent[] events =
         v2.getTaskAttemptCompletionEvents(ta1_t1_v2, 0, 100);
     Assert.assertEquals(1, events.length);
-    TezDependentTaskCompletionEvent clone = events[0]; 
+    TezDependentTaskCompletionEvent clone = events[0];
     // payload must be present in the first event
     Assert.assertEquals(payload, clone.getUserPayload());
     // event must be a copy

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index e232ec0..408f88a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tez.common.InputSpec;
-import org.apache.tez.common.OutputSpec;
 import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
@@ -57,12 +57,15 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.newapi.impl.InputSpec;
+import org.apache.tez.engine.newapi.impl.OutputSpec;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
 
 public class TestContainerReuse {
-  
+
 
   @Test(timeout = 15000l)
   public void testDelayedReuseContainerBecomesAvailable() throws IOException, InterruptedException, ExecutionException {
@@ -77,7 +80,7 @@ public class TestContainerReuse {
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = new TezDAGID("0", 0, 0);
     TezVertexID vertexID = new TezVertexID(dagID, 1);
-    
+
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     AMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
     String appUrl = "url";
@@ -143,9 +146,9 @@ public class TestContainerReuse {
     verify(rmClient, times(0)).releaseAssignedContainer(eq(containerHost1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
-    
+
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED));
-    
+
     long currentTs = System.currentTimeMillis();
     Throwable exception = null;
     while (System.currentTimeMillis() < currentTs + 5000l) {
@@ -162,7 +165,7 @@ public class TestContainerReuse {
     taskScheduler.close();
     taskSchedulerEventHandler.close();
   }
-  
+
   @Test(timeout = 15000l)
   public void testDelayedReuseContainerNotAvailable() throws IOException, InterruptedException, ExecutionException {
     Configuration conf = new Configuration(new YarnConfiguration());
@@ -176,7 +179,7 @@ public class TestContainerReuse {
     CapturingEventHandler eventHandler = new CapturingEventHandler();
     TezDAGID dagID = new TezDAGID("0", 0, 0);
     TezVertexID vertexID = new TezVertexID(dagID, 1);
-    
+
     AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
     AMRMClientAsync<CookieContainerRequest> rmClient = spy(new AMRMClientAsyncForTest(rmClientCore, 100));
     String appUrl = "url";
@@ -202,28 +205,28 @@ public class TestContainerReuse {
     TaskSchedulerWithDrainableAppCallback taskScheduler = (TaskSchedulerWithDrainableAppCallback) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
         .getSpyTaskScheduler();
     TaskSchedulerAppCallbackDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
-    
+
     Resource resource = Resource.newInstance(1024, 1);
     Priority priority = Priority.newInstance(5);
     String [] host1 = {"host1"};
     String [] host2 = {"host2"};
-    
+
     String [] defaultRack = {"/default-rack"};
-    
+
     TezTaskAttemptID taID11 = new TezTaskAttemptID(new TezTaskID(vertexID, 1), 1);
     TezTaskAttemptID taID21 = new TezTaskAttemptID(new TezTaskID(vertexID, 2), 1);
     TezTaskAttemptID taID31 = new TezTaskAttemptID(new TezTaskID(vertexID, 3), 1);
     TaskAttempt ta11 = mock(TaskAttempt.class);
     TaskAttempt ta21 = mock(TaskAttempt.class);
     TaskAttempt ta31 = mock(TaskAttempt.class);
-    
+
     AMSchedulerEventTALaunchRequest lrTa11 = createLaunchRequestEvent(taID11, ta11, resource, host1, defaultRack, priority, conf);
     AMSchedulerEventTALaunchRequest lrTa21 = createLaunchRequestEvent(taID21, ta21, resource, host2, defaultRack, priority, conf);
     AMSchedulerEventTALaunchRequest lrTa31 = createLaunchRequestEvent(taID31, ta31, resource, host1, defaultRack, priority, conf);
-    
+
     taskSchedulerEventHandler.handleEvent(lrTa11);
     taskSchedulerEventHandler.handleEvent(lrTa21);
-    
+
     Container containerHost1 = createContainer(1, host1[0], resource, priority);
     Container containerHost2 = createContainer(2, host2[0], resource, priority);
 
@@ -231,10 +234,10 @@ public class TestContainerReuse {
     drainableAppCallback.drain();
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta11), any(Object.class), eq(containerHost1));
     verify(taskSchedulerEventHandler).taskAllocated(eq(ta21), any(Object.class), eq(containerHost2));
-    
+
     // Adding the event later so that task1 assigned to containerHost1 is deterministic.
     taskSchedulerEventHandler.handleEvent(lrTa31);
-   
+
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta21, containerHost2.getId(), TaskAttemptState.SUCCEEDED));
     drainableAppCallback.drain();
     verify(taskScheduler).deallocateTask(eq(ta21), eq(true));
@@ -242,7 +245,7 @@ public class TestContainerReuse {
     verify(rmClient, times(0)).releaseAssignedContainer(eq(containerHost2.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
-    
+
     long currentTs = System.currentTimeMillis();
     Throwable exception = null;
     while (System.currentTimeMillis() < currentTs + 5000l) {
@@ -385,7 +388,7 @@ public class TestContainerReuse {
     taskScheduler.close();
     taskSchedulerEventHandler.close();
   }
-  
+
   @Test(timeout = 10000l)
   public void testReuseNonLocalRequest() throws IOException, InterruptedException, ExecutionException {
     Configuration tezConf = new Configuration(new YarnConfiguration());
@@ -444,7 +447,7 @@ public class TestContainerReuse {
     TaskAttempt ta12 = mock(TaskAttempt.class);
     doReturn(vertexID).when(ta12).getVertexID();
     AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID12, ta12, resource1, emptyHosts, emptyRacks, priority, tezConf);
-    
+
     // Send launch request for task 1 onle, deterministic assignment to this task.
     taskSchedulerEventHandler.handleEvent(lrEvent11);
 
@@ -457,7 +460,7 @@ public class TestContainerReuse {
 
     // Send launch request for task2 (vertex2)
     taskSchedulerEventHandler.handleEvent(lrEvent12);
-    
+
     // Task assigned to container completed successfully. Container should be
     // assigned immediately to ta12, since there's no local requests (instead of
     // waiting for the re-use delay)
@@ -478,7 +481,7 @@ public class TestContainerReuse {
     taskScheduler.close();
     taskSchedulerEventHandler.close();
   }
-  
+
   @Test(timeout = 10000l)
   public void testNoReuseAcrossVertices() throws IOException, InterruptedException, ExecutionException {
     Configuration tezConf = new Configuration(new YarnConfiguration());
@@ -536,7 +539,7 @@ public class TestContainerReuse {
     TaskAttempt ta21 = mock(TaskAttempt.class);
     doReturn(vertexID2).when(ta21).getVertexID();
     AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID21, ta21, resource1, host1, racks, priority, tezConf);
-    
+
     // Send launch request for task 1 onle, deterministic assignment to this task.
     taskSchedulerEventHandler.handleEvent(lrEvent11);
 
@@ -549,7 +552,7 @@ public class TestContainerReuse {
 
     // Send launch request for task2 (vertex2)
     taskSchedulerEventHandler.handleEvent(lrEvent21);
-    
+
     // Task assigned to container completed successfully. Container should not be assigned to task21. Released since delay is 0.
     taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED));
     drainableAppCallback.drain();
@@ -575,14 +578,17 @@ public class TestContainerReuse {
       TezTaskAttemptID taID, TaskAttempt ta, Resource capability, String[] hosts,
       String[] racks, Priority priority, Configuration conf) {
 
-    ContainerContext containerContext = 
+    ContainerContext containerContext =
         new ContainerContext(new HashMap<String, LocalResource>(),
             new Credentials(), new HashMap<String, String>(), "");
-    AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(taID, capability, new TezEngineTaskContext(taID, "user", "jobName", "vertexName",
-        new ProcessorDescriptor("processorClassName"),
-        Collections.singletonList(new InputSpec("vertexName", 1,
-            "inputClassName")), Collections.singletonList(new OutputSpec(
-            "vertexName", 1, "outputClassName"))), ta, hosts, racks, priority, containerContext);
+    AMSchedulerEventTALaunchRequest lr = new AMSchedulerEventTALaunchRequest(taID, capability,
+        new TaskSpec(taID, "user", "vertexName",
+            new ProcessorDescriptor("processorClassName"),
+            Collections.singletonList(new InputSpec("vertexName",
+                new InputDescriptor("inputClassName"), 1)),
+            Collections.singletonList(new OutputSpec(
+            "vertexName", new OutputDescriptor("outputClassName"), 1))),
+            ta, hosts, racks, priority, containerContext);
     return lr;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 2d2945e..62b6c28 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -110,7 +110,7 @@ public class TestAMContainer {
     wc.verifyNoOutgoingEvents();
     assertFalse(pulledTask.shouldDie());
     assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
-        .getTaskAttemptId());
+        .getTaskAttemptID());
     assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
 
@@ -166,7 +166,7 @@ public class TestAMContainer {
     wc.verifyNoOutgoingEvents();
     assertFalse(pulledTask.shouldDie());
     assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
-        .getTaskAttemptId());
+        .getTaskAttemptID());
     assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine-api/pom.xml b/tez-engine-api/pom.xml
index 107bdc9..b19e96b 100644
--- a/tez-engine-api/pom.xml
+++ b/tez-engine-api/pom.xml
@@ -44,6 +44,10 @@
       <groupId>com.google.inject</groupId>
       <artifactId>guice</artifactId>
     </dependency>
+    <dependency>
+     <groupId>com.google.protobuf</groupId>
+     <artifactId>protobuf-java</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
@@ -54,6 +58,33 @@
         <configuration>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>Events.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
new file mode 100644
index 0000000..182e8dc
--- /dev/null
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/DataMovementEvent.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.newapi.events;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.tez.engine.newapi.Event;
+
+/**
+ * Event used by user code to send information between tasks. An output can
+ * generate an Event of this type to sending information regarding output data
+ * ( such as URI for file-based output data, port info in case of
+ * streaming-based data transfers ) to the Input on the destination vertex.
+ */
+public final class DataMovementEvent extends Event {
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that generated an Event.
+   * For a Processor-generated event, this is ignored.
+   */
+  private final int sourceIndex;
+
+  /**
+   * Index(i) of the i-th (physical) Input or Output that is meant to receive
+   * this Event. For a Processor event, this is ignored.
+   */
+  private int targetIndex;
+
+  /**
+   * User Payload for this Event
+   */
+  private final byte[] userPayload;
+
+  /**
+   * Version number to indicate what attempt generated this Event
+   */
+  private int version;
+
+  /**
+   * User Event constructor
+   * @param sourceIndex Index to identify the physical edge of the input/output
+   * that generated the event
+   * @param userPayload User Payload of the User Event
+   */
+  public DataMovementEvent(int sourceIndex,
+      byte[] userPayload) {
+    this.userPayload = userPayload;
+    this.sourceIndex = sourceIndex;
+  }
+
+  @Private
+  public DataMovementEvent(int sourceIndex,
+      int targetIndex,
+      byte[] userPayload) {
+    this.userPayload = userPayload;
+    this.sourceIndex = sourceIndex;
+    this.targetIndex = targetIndex;
+  }
+
+  /**
+   * Constructor for Processor-generated User Events
+   * @param userPayload
+   */
+  public DataMovementEvent(byte[] userPayload) {
+    this(-1, userPayload);
+  }
+
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  public int getSourceIndex() {
+    return sourceIndex;
+  }
+
+  public int getTargetIndex() {
+    return targetIndex;
+  }
+
+  @Private
+  void setTargetIndex(int targetIndex) {
+    this.targetIndex = targetIndex;
+  }
+
+  public int getVersion() {
+    return version;
+  }
+
+  @Private
+  public void setVersion(int version) {
+    this.version = version;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
index 4528e4d..c5a92b8 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/InputDataErrorEvent.java
@@ -24,7 +24,7 @@ import org.apache.tez.engine.newapi.Event;
  * Event generated by an Input to indicate error when trying to
  * retrieve data.
  */
-public class InputDataErrorEvent extends Event {
+public final class InputDataErrorEvent extends Event {
 
   /**
    * Diagnostics/trace of the error that occurred on the Input's edge.
@@ -36,10 +36,17 @@ public class InputDataErrorEvent extends Event {
    */
   private final int index;
 
-  protected InputDataErrorEvent(String diagnostics, int index) {
+  /**
+   * Version of the data on which the error occurred.
+   */
+  private final int version;
+
+  public InputDataErrorEvent(String diagnostics, int index,
+      int version) {
     super();
     this.diagnostics = diagnostics;
     this.index = index;
+    this.version = version;
   }
 
   public String getDiagnostics() {
@@ -50,4 +57,8 @@ public class InputDataErrorEvent extends Event {
     return index;
   }
 
+  public int getVersion() {
+    return version;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
deleted file mode 100644
index 9d8f01a..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/events/TaskCommunicationEvent.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.events;
-
-import org.apache.tez.engine.newapi.Event;
-
-/**
- * Event used by user code to send information between tasks. An output can
- * generate an Event of this type to sending information regarding output data
- * ( such as URI for file-based output data, port info in case of
- * streaming-based data transfers ) to the Input on the destination vertex.
- */
-public final class TaskCommunicationEvent extends Event {
-
-  /**
-   * Index(i) of the i-th (physical) Input or Output that generated an Event.
-   * For a Processor-generated event, this is ignored.
-   */
-  private final int sourceIndex;
-
-  /**
-   * Index(i) of the i-th (physical) Input or Output that is meant to receive
-   * this Event. For a Processor event, this is ignored.
-   */
-  private int targetIndex;
-
-  /**
-   * User Payload for this Event
-   */
-  private final byte[] userPayload;
-
-  /**
-   * User Event constructor
-   * @param index Index to identify the physical edge of the input/output
-   * @param userPayload User Payload of the User Event
-   */
-  public TaskCommunicationEvent(int index,
-      byte[] userPayload) {
-    this.userPayload = userPayload;
-    this.sourceIndex = index;
-  }
-
-  /**
-   * Constructor for Processor-generated User Events
-   * @param userPayload
-   */
-  public TaskCommunicationEvent(byte[] userPayload) {
-    this(-1, userPayload);
-  }
-
-  public byte[] getUserPayload() {
-    return userPayload;
-  }
-
-  public int getSourceIndex() {
-    return sourceIndex;
-  }
-
-  public int getTargetIndex() {
-    return targetIndex;
-  }
-
-  void setTargetIndex(int targetIndex) {
-    this.targetIndex = targetIndex;
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
deleted file mode 100644
index fe06bec..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/EventMetaData.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * Class that encapsulates all the information to identify the unique
- * object that either generated an Event or is the recipient of an Event.
- */
-public class EventMetaData {
-
-  public static enum EventGenerator {
-    INPUT,
-    PROCESSOR,
-    OUTPUT,
-    SYSTEM
-  }
-
-  /**
-   * Source Type ( one of Input/Output/Processor ) that generated the Event.
-   */
-  private final EventGenerator generator;
-
-  /**
-   * Name of the vertex where the event was generated.
-   */
-  private final String taskVertexName;
-
-  /**
-   * Name of the vertex to which the Input or Output is connected to.
-   */
-  private final String edgeVertexName;
-
-  /**
-   * Task Attempt ID
-   */
-  private final TezTaskAttemptID taskAttemptID;
-
-  public EventMetaData(EventGenerator generator,
-      String taskVertexName, String edgeVertexName,
-      TezTaskAttemptID taskAttemptID) {
-    this.generator = generator;
-    this.taskVertexName = taskVertexName;
-    this.edgeVertexName = edgeVertexName;
-    this.taskAttemptID = taskAttemptID;
-  }
-
-  public EventGenerator getEventGenerator() {
-    return generator;
-  }
-
-  public TezTaskAttemptID getTaskAttemptID() {
-    return taskAttemptID;
-  }
-
-  public String getTaskVertexName() {
-    return taskVertexName;
-  }
-
-  public String getEdgeVertexName() {
-    return edgeVertexName;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
deleted file mode 100644
index cb8bd6d..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/impl/TezUmbilical.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.impl;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.rpc.impl.TaskSpec;
-
-/**
- * Interface to the RPC layer ( umbilical ) between the Tez AM and
- * a Tez Container's JVM.
- */
-public interface TezUmbilical {
-
-  /**
-   * Heartbeat call back to the AM from the Container JVM
-   * @param events Events to be sent to the AM
-   * @return Events sent by the AM to the Container JVM which in turn will
-   * be handled either by the JVM or routed to the appropriate instances of
-   * Input/Processor/Outputs with a particular Task Attempt.
-   */
-  public Event[] hearbeat(Event[] events);
-
-  /**
-   * Hook to ask the Tez AM for the next task to be run on the Container
-   * @return Next task to be run
-   */
-  public TaskSpec getNextTask();
-
-  /**
-   * Hook to query the Tez AM whether a particular Task Attempt can commit its
-   * output.
-   * @param attemptIDs Attempt IDs of the Tasks that are waiting to commit.
-   * @return Map of boolean flags indicating whether the respective task
-   * attempts can commit.
-   */
-  public boolean canTaskCommit(TezTaskAttemptID attemptID);
-
-  /**
-   * Inform the Tez AM that one or more Task attempts have failed.
-   * @param attemptIDs Task Attempt IDs for the failed attempts.
-   */
-  public void taskFailed(TezTaskAttemptID attemptID);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java
deleted file mode 100644
index 200ec21..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/InputSpec.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.records.TezVertexID;
-
-/**
- * Serializable information of a given Physical Input.
- */
-public interface InputSpec {
-
-  /**
-   * @return The name of the Source Vertex whose Output is consumed by this
-   * Input.
-   */
-  public String getSourceVertexName();
-
-  /**
-   * @return The Vertex ID of the Source Vertex whose Output is consumed by this
-   * Input.
-   */
-  public TezVertexID getSourceVertexID();
-
-  /**
-   * @return {@link InputDescriptor}
-   */
-  public InputDescriptor getInputDescriptor();
-
-  /**
-   * @return The no. of physical edges mapping to this Input.
-   */
-  public int getPhysicalEdgeCount();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java
deleted file mode 100644
index 0cf41b6..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/OutputSpec.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.dag.api.OutputDescriptor;
-import org.apache.tez.dag.records.TezVertexID;
-
-public interface OutputSpec {
-
-  /**
-   * @return The name of the Target Vertex whose Input is consumed by this
-   * Output.
-   */
-  public String getDestinationVertexName();
-
-  /**
-   * @return The Vertex ID of the Target Vertex whose Input is consumed by this
-   * Output.
-   */
-  public TezVertexID getDestinationVertexID();
-
-  /**
-   * @return {@link OutputDescriptor}
-   */
-  public OutputDescriptor getOutputDescriptor();
-
-  /**
-   * @return The no. of physical edges mapping to this Output.
-   */
-  public int getPhysicalEdgeCount();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
deleted file mode 100644
index 2ba8cf7..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TaskSpec.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import java.util.List;
-
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-/**
- * Serializable Task information that is sent across the Umbilical from the
- * Tez AM to the Tez Container's JVM.
- */
-public interface TaskSpec {
-
-  /**
-   * Get the vertex name for the current task.
-   * @return the vertex name set by the user.
-   */
-  public String getVertexName();
-  
-  /**
-   * Get the task attempt id for the current task.
-   * @return the {@link TaskAttemptID}
-   */
-  public TezTaskAttemptID getTaskAttemptID();
-  
-  /**
-   * Get the owner of the job.
-   * @return the owner.
-   */
-  public String getUser();
-  /**
-   * The Processor definition for the given Task
-   * @return {@link ProcessorDescriptor}
-   */
-  public ProcessorDescriptor getProcessorDescriptor();
-
-  /**
-   * The List of Inputs for this Task.
-   * @return {@link Input}
-   */
-  public List<InputSpec> getInputs();
-
-  /**
-   * The List of Outputs for this Task.
-   * @return {@link Output}
-   */
-  public List<OutputSpec> getOutputs();
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
deleted file mode 100644
index 19174c8..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/rpc/impl/TezEvent.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.newapi.rpc.impl;
-
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.impl.EventMetaData;
-
-public class TezEvent {
-
-  private final String eventClassName;
-
-  private final Event event;
-
-  private EventMetaData sourceInfo;
-
-  private EventMetaData targetInfo;
-
-  public TezEvent(Event event, EventMetaData sourceInfo) {
-    this.event = event;
-    this.eventClassName = event.getClass().getName();
-    this.setSourceInfo(sourceInfo);
-  }
-
-  public Event getEvent() {
-    return event;
-  }
-
-  public EventMetaData getSourceInfo() {
-    return sourceInfo;
-  }
-
-  public void setSourceInfo(EventMetaData sourceInfo) {
-    this.sourceInfo = sourceInfo;
-  }
-
-  public EventMetaData getTargetInfo() {
-    return targetInfo;
-  }
-
-  public void setTargetInfo(EventMetaData targetInfo) {
-    this.targetInfo = targetInfo;
-  }
-
-  public String getEventClassName() {
-    return eventClassName;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine-api/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/proto/Events.proto b/tez-engine-api/src/main/proto/Events.proto
new file mode 100644
index 0000000..3651c35
--- /dev/null
+++ b/tez-engine-api/src/main/proto/Events.proto
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tez.engine.api.events";
+option java_outer_classname = "EventProtos";
+option java_generate_equals_and_hash = true;
+
+message DataMovementEventProto {
+  optional int32 source_index = 1;
+  optional int32 target_index = 2;
+  optional bytes user_payload = 3;
+  optional int32 version = 4;
+}
+
+message InputDataErrorEventProto {
+  optional int32 index = 1;
+  optional string diagnostics = 2;
+  optional int32 version = 3;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine/pom.xml b/tez-engine/pom.xml
index 4ecf210..ecfdc04 100644
--- a/tez-engine/pom.xml
+++ b/tez-engine/pom.xml
@@ -49,6 +49,10 @@
       <groupId>com.google.inject</groupId>
       <artifactId>guice</artifactId>
     </dependency>
+    <dependency>
+     <groupId>com.google.protobuf</groupId>
+     <artifactId>protobuf-java</artifactId>
+    </dependency>
   </dependencies>
 
   <build>
@@ -59,6 +63,33 @@
         <configuration>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
+        <executions>
+          <execution>
+            <id>compile-protoc</id>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>protoc</goal>
+            </goals>
+            <configuration>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <protocCommand>${protoc.path}</protocCommand>
+              <imports>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>Events.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cc83e45/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
new file mode 100644
index 0000000..3c18d9f
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/common/ContainerTask.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.engine.newapi.impl.TaskSpec;
+
+public class ContainerTask implements Writable {
+
+  TaskSpec taskSpec;
+  boolean shouldDie;
+
+  public ContainerTask() {
+  }
+
+  public ContainerTask(TaskSpec taskSpec, boolean shouldDie) {
+    this.taskSpec = taskSpec;
+    this.shouldDie = shouldDie;
+  }
+
+  public TaskSpec getTaskSpec() {
+    return taskSpec;
+  }
+
+  public boolean shouldDie() {
+    return shouldDie;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(shouldDie);
+    if (taskSpec != null) {
+      out.writeBoolean(true);
+      taskSpec.write(out);
+    } else {
+      out.writeBoolean(false);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    shouldDie = in.readBoolean();
+    boolean taskComing = in.readBoolean();
+    if (taskComing) {
+      taskSpec = new TaskSpec();
+      taskSpec.readFields(in);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "shouldDie: " + shouldDie + ", TaskSpec: "
+        + taskSpec;
+  }
+}