You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:25 UTC

[18/50] [abbrv] TEZ-443. Merge tez-dag-api and tez-engine-api into a single module - tez-api (part of TEZ-398). (sseth)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
deleted file mode 100644
index dae5625..0000000
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api.client.rpc;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.VertexStatus;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetDAGStatusRequestProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetVertexStatusRequestProto;
-import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.TryKillDAGRequestProto;
-import org.apache.tez.dag.api.records.DAGProtos.DAGStatusProto;
-import org.apache.tez.dag.api.records.DAGProtos.DAGStatusStateProto;
-
-import com.google.protobuf.ServiceException;
-
-public class DAGClientRPCImpl implements DAGClient {
-  private static final Log LOG = LogFactory.getLog(DAGClientRPCImpl.class);
-
-  private final ApplicationId appId;
-  private final String dagId;
-  private final TezConfiguration conf;
-  private ApplicationReport appReport;
-  private YarnClient yarnClient;
-  private DAGClientAMProtocolBlockingPB proxy = null;
-
-  public DAGClientRPCImpl(ApplicationId appId, String dagId,
-      TezConfiguration conf) {
-    this.appId = appId;
-    this.dagId = dagId;
-    this.conf = conf;
-    yarnClient = new YarnClientImpl();
-    yarnClient.init(new YarnConfiguration(conf));
-    yarnClient.start();
-    appReport = null;
-  }
-
-  @Override
-  public ApplicationId getApplicationId() {
-    return appId;
-  }
-
-  @Override
-  public DAGStatus getDAGStatus() throws IOException, TezException {
-    if(createAMProxyIfNeeded()) {
-      try {
-        return getDAGStatusViaAM();
-      } catch (TezException e) {
-        resetProxy(e); // create proxy again
-      }
-    }
-
-    // Later maybe from History
-    return getDAGStatusViaRM();
-  }
-
-  @Override
-  public VertexStatus getVertexStatus(String vertexName)
-                                    throws IOException, TezException {
-    if(createAMProxyIfNeeded()) {
-      try {
-        return getVertexStatusViaAM(vertexName);
-      } catch (TezException e) {
-        resetProxy(e); // create proxy again
-      }
-    }
-
-    // need AM for this. Later maybe from History
-    return null;
-  }
-
-  @Override
-  public void tryKillDAG() throws TezException, IOException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("TryKill for app: " + appId + " dag:" + dagId);
-    }
-    if(createAMProxyIfNeeded()) {
-      TryKillDAGRequestProto requestProto =
-          TryKillDAGRequestProto.newBuilder().setDagId(dagId).build();
-      try {
-        proxy.tryKillDAG(null, requestProto);
-      } catch (ServiceException e) {
-        resetProxy(e);
-      }
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (this.proxy != null) {
-      RPC.stopProxy(this.proxy);
-    }
-    if(yarnClient != null) {
-      yarnClient.stop();
-    }
-  }
-
-  @Override
-  public ApplicationReport getApplicationReport() {
-    return appReport;
-  }
-
-  void resetProxy(Exception e) {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Resetting AM proxy for app: " + appId + " dag:" + dagId +
-          " due to exception :", e);
-    }
-    proxy = null;
-  }
-
-  DAGStatus getDAGStatusViaAM() throws IOException, TezException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
-    }
-    GetDAGStatusRequestProto requestProto =
-        GetDAGStatusRequestProto.newBuilder().setDagId(dagId).build();
-    try {
-      return new DAGStatus(
-                 proxy.getDAGStatus(null, requestProto).getDagStatus());
-    } catch (ServiceException e) {
-      // TEZ-151 retrieve wrapped TezException
-      throw new TezException(e);
-    }
-  }
-
-
-
-  DAGStatus getDAGStatusViaRM() throws TezException, IOException {
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("GetDAGStatus via AM for app: " + appId + " dag:" + dagId);
-    }
-    ApplicationReport appReport;
-    try {
-      appReport = yarnClient.getApplicationReport(appId);
-    } catch (YarnException e) {
-      throw new TezException(e);
-    }
-
-    if(appReport == null) {
-      throw new TezException("Unknown/Invalid appId: " + appId);
-    }
-
-    DAGStatusProto.Builder builder = DAGStatusProto.newBuilder();
-    DAGStatus dagStatus = new DAGStatus(builder);
-    DAGStatusStateProto dagState = null;
-    switch (appReport.getYarnApplicationState()) {
-    case NEW:
-    case NEW_SAVING:
-    case SUBMITTED:
-    case ACCEPTED:
-      dagState = DAGStatusStateProto.DAG_SUBMITTED;
-      break;
-    case RUNNING:
-      dagState = DAGStatusStateProto.DAG_RUNNING;
-      break;
-    case FAILED:
-      dagState = DAGStatusStateProto.DAG_FAILED;
-      break;
-    case KILLED:
-      dagState = DAGStatusStateProto.DAG_KILLED;
-      break;
-    case FINISHED:
-      switch(appReport.getFinalApplicationStatus()) {
-      case UNDEFINED:
-      case FAILED:
-        dagState = DAGStatusStateProto.DAG_FAILED;
-        break;
-      case KILLED:
-        dagState = DAGStatusStateProto.DAG_KILLED;
-        break;
-      case SUCCEEDED:
-        dagState = DAGStatusStateProto.DAG_SUCCEEDED;
-        break;
-      default:
-        throw new TezUncheckedException("Encountered unknown final application"
-          + " status from YARN"
-          + ", appState=" + appReport.getYarnApplicationState()
-          + ", finalStatus=" + appReport.getFinalApplicationStatus());
-      }
-      break;
-    default:
-      throw new TezUncheckedException("Encountered unknown application state"
-          + " from YARN, appState=" + appReport.getYarnApplicationState());
-    }
-
-    builder.setState(dagState);
-    if(appReport.getDiagnostics() != null) {
-      builder.addAllDiagnostics(Collections.singleton(appReport.getDiagnostics()));
-    }
-
-    return dagStatus;
-  }
-
-  VertexStatus getVertexStatusViaAM(String vertexName) throws TezException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("GetVertexStatus via AM for app: " + appId + " dag: " + dagId
-          + " vertex: " + vertexName);
-    }
-    GetVertexStatusRequestProto requestProto =
-        GetVertexStatusRequestProto.newBuilder().
-                        setDagId(dagId).setVertexName(vertexName).build();
-
-    try {
-      return new VertexStatus(
-                 proxy.getVertexStatus(null, requestProto).getVertexStatus());
-    } catch (ServiceException e) {
-      // TEZ-151 retrieve wrapped TezException
-      throw new TezException(e);
-    }
-  }
-
-  ApplicationReport getAppReport() throws IOException, TezException {
-    try {
-      ApplicationReport appReport = yarnClient.getApplicationReport(appId);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("App: " + appId + " in state: "
-            + appReport.getYarnApplicationState());
-      }
-      return appReport;
-    } catch (YarnException e) {
-      throw new TezException(e);
-    }
-  }
-
-  boolean createAMProxyIfNeeded() throws IOException, TezException {
-    if(proxy != null) {
-      // if proxy exist optimistically use it assuming there is no retry
-      return true;
-    }
-    appReport = getAppReport();
-
-    if(appReport == null) {
-      return false;
-    }
-    YarnApplicationState appState = appReport.getYarnApplicationState();
-    if(appState != YarnApplicationState.RUNNING) {
-      return false;
-    }
-
-    // YARN-808. Cannot ascertain if AM is ready until we connect to it.
-    // workaround check the default string set by YARN
-    if(appReport.getHost() == null || appReport.getHost().equals("N/A") ||
-        appReport.getRpcPort() == 0){
-      // attempt not running
-      return false;
-    }
-
-    InetSocketAddress addr = new InetSocketAddress(appReport.getHost(),
-        appReport.getRpcPort());
-
-    RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
-        ProtobufRpcEngine.class);
-    proxy = (DAGClientAMProtocolBlockingPB) RPC.getProxy(
-        DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/main/proto/DAGApiRecords.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGApiRecords.proto b/tez-dag-api/src/main/proto/DAGApiRecords.proto
deleted file mode 100644
index 4385749..0000000
--- a/tez-dag-api/src/main/proto/DAGApiRecords.proto
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tez.dag.api.records";
-option java_outer_classname = "DAGProtos";
-option java_generate_equals_and_hash = true;
-
-// DAG plan messages
-
-// Many of these types have a dual in the Tez-api.  To reduce confusion, these types have prefix or suffix 
-// of "Plan" to indicate they are to be used in the dag-plan.
-// The big types use a suffix:  JobPlan, VertexPlan, EdgePlan 
-//   --> these get more direct use in the runtime and the naming is natural.
-// The enums and utility types use prefix: PlanVertexType, PlanEdgeConnectionPaatern, etc
-//   --> there is not great naming choice for these that avoids ambiguity, but this one seems acceptable. 
-
-enum PlanVertexType {
-  INPUT = 0;
-  NORMAL = 1;
-  OUTPUT = 2;
-}
-
-enum PlanEdgeDataMovementType {
-  ONE_TO_ONE = 0;
-  BROADCAST = 1;
-  SCATTER_GATHER = 2;
-}
-
-enum PlanEdgeDataSourceType {
-  PERSISTED = 0;
-  PERSISTED_RELIABLE = 1;
-  EPHEMERAL = 2;
-}
-
-enum PlanEdgeSchedulingType {
-  SEQUENTIAL = 0;
-  CONCURRENT = 1;
-}
-
-message PlanKeyValuePair {
-  required string key = 1;
-  required string value = 2;
-}
-
-enum PlanLocalResourceType {
-  FILE = 0;
-  ARCHIVE = 1;
-  PATTERN = 2;
-}
-
-enum PlanLocalResourceVisibility {
-  PUBLIC = 0;
-  PRIVATE = 1;
-  APPLICATION = 2;
-}
-
-message PlanLocalResource {
-  required string name = 1;
-  required string uri = 2;
-  required int64 size = 3;
-  required int64 timeStamp = 4;
-  required PlanLocalResourceType type = 5;
-  required PlanLocalResourceVisibility visibility = 6;
-  optional string pattern = 7; // only used if type=PATTERN
-}
-
-// Each taskLocationHint represents a single split in in the input.
-// It is the list of [{rack,machines}] that host a replica of each particular split.
-// For now it is represented as pair-of-arrays rather than array-of-pairs.
-message PlanTaskLocationHint {
-  repeated string rack = 1;
-  repeated string host = 2;
-}
-
-message PlanTaskConfiguration {
-  required int32 numTasks = 1;
-  required int32 memoryMb = 2;
-  required int32 virtualCores = 3;
-  required string javaOpts = 4;
-  required string taskModule = 5;
-  repeated PlanLocalResource localResource = 6;
-  repeated PlanKeyValuePair environmentSetting = 8;  
-}
-
-message TezEntityDescriptorProto {
-  optional string class_name = 1;
-  optional bytes user_payload = 2;
-}
-
-message VertexPlan {
-  required string name = 1;
-  required PlanVertexType type = 2;
-  optional TezEntityDescriptorProto processor_descriptor = 3;
-  required PlanTaskConfiguration taskConfig = 4;
-  repeated PlanTaskLocationHint taskLocationHint = 7;
-  repeated string inEdgeId = 8;
-  repeated string outEdgeId = 9;
-}
-
-message EdgePlan {
-  required string id = 1;
-  required string inputVertexName = 2;
-  required string outputVertexName = 3;
-  required PlanEdgeDataMovementType dataMovementType = 4;
-  required PlanEdgeDataSourceType dataSourceType = 5;
-  required PlanEdgeSchedulingType schedulingType = 6;
-  optional TezEntityDescriptorProto edge_source = 7;
-  optional TezEntityDescriptorProto edge_destination = 8;
-}
-
-message ConfigurationProto {
-  repeated PlanKeyValuePair confKeyValues = 1;
-}
-
-message DAGPlan {
-  required string name = 1;
-  repeated VertexPlan vertex = 2;
-  repeated EdgePlan edge = 3;
-  optional ConfigurationProto dagKeyValues = 4;
-}
-
-// DAG monitoring messages
-message ProgressProto {
-  optional int32 totalTaskCount = 1;
-  optional int32 succeededTaskCount = 2;
-  optional int32 runningTaskCount = 3;
-  optional int32 failedTaskCount = 4;
-  optional int32 killedTaskCount = 5;
-}
-
-enum VertexStatusStateProto {
-  VERTEX_INITED = 0;
-  VERTEX_RUNNING = 1;
-  VERTEX_SUCCEEDED = 2;
-  VERTEX_KILLED = 3;
-  VERTEX_FAILED = 4;
-  VERTEX_ERROR = 5;
-  VERTEX_TERMINATING = 6;
-}
-
-message VertexStatusProto {
-  optional VertexStatusStateProto state = 1;
-  repeated string diagnostics = 2;
-  optional ProgressProto progress = 3;
-}
-
-enum DAGStatusStateProto {
-  DAG_SUBMITTED = 0;
-  DAG_INITING = 1;
-  DAG_RUNNING = 2;
-  DAG_SUCCEEDED = 3;
-  DAG_KILLED = 4;
-  DAG_FAILED = 5;
-  DAG_ERROR = 6;
-  DAG_TERMINATING = 7;
-}
-
-message StringProgressPairProto {
-  required string key = 1;
-  required ProgressProto progress = 2;
-}
-
-message DAGStatusProto {
-  optional DAGStatusStateProto state = 1;
-  repeated string diagnostics = 2;
-  optional ProgressProto DAGProgress = 3;
-  repeated StringProgressPairProto vertexProgress = 4;  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto b/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
deleted file mode 100644
index 6fcd1f8..0000000
--- a/tez-dag-api/src/main/proto/DAGClientAMProtocol.proto
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tez.dag.api.client.rpc";
-option java_outer_classname = "DAGClientAMProtocolRPC";
-option java_generic_services = true;
-option java_generate_equals_and_hash = true;
-
-//import "DAGClientAMProtocolRecords.proto";
-
-import "DAGApiRecords.proto";
-
-message GetAllDAGsRequestProto {
-}
-
-message GetAllDAGsResponseProto {
-  repeated string dagId = 1;
-}
-
-message GetDAGStatusRequestProto {
-  optional string dagId = 1;
-}
-
-message GetDAGStatusResponseProto {
-  optional DAGStatusProto dagStatus = 1;
-}
-
-message GetVertexStatusRequestProto {
-  optional string dagId = 1;
-  optional string vertexName = 2;
-}
-
-message GetVertexStatusResponseProto {
-  optional VertexStatusProto vertexStatus = 1;
-}
-
-message TryKillDAGRequestProto {
-  optional string dagId = 1;
-}
-
-message TryKillDAGResponseProto {
-  //nothing yet
-}
-
-message SubmitDAGRequestProto {
-  optional DAGPlan d_a_g_plan = 1;
-}
-
-message SubmitDAGResponseProto {
-  optional string dagId = 1;
-}
-
-message ShutdownSessionRequestProto {
-}
-
-message ShutdownSessionResponseProto {
-}
-
-service DAGClientAMProtocol {
-  rpc getAllDAGs (GetAllDAGsRequestProto) returns (GetAllDAGsResponseProto);
-  rpc getDAGStatus (GetDAGStatusRequestProto) returns (GetDAGStatusResponseProto);
-  rpc getVertexStatus (GetVertexStatusRequestProto) returns (GetVertexStatusResponseProto);
-  rpc tryKillDAG (TryKillDAGRequestProto) returns (TryKillDAGResponseProto);
-  rpc submitDAG (SubmitDAGRequestProto) returns (SubmitDAGResponseProto);
-  rpc shutdownSession (ShutdownSessionRequestProto) returns (ShutdownSessionResponseProto);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
deleted file mode 100644
index 53ec357..0000000
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGPlan.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
-import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
-import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
-import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
-import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
-import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-// based on TestDAGLocationHint
-public class TestDAGPlan {
-  @Rule
-  public TemporaryFolder tempFolder = new TemporaryFolder(); //TODO: doesn't seem to be deleting this folder automatically as expected.
-
-  @Test
-  public void testBasicJobPlanSerde() throws IOException {
-
-    DAGPlan job = DAGPlan.newBuilder()
-       .setName("test")
-       .addVertex(
-           VertexPlan.newBuilder()
-             .setName("vertex1")
-             .setType(PlanVertexType.NORMAL)
-             .addTaskLocationHint(PlanTaskLocationHint.newBuilder().addHost("machineName").addRack("rack1").build())
-             .setTaskConfig(
-                 PlanTaskConfiguration.newBuilder()
-                   .setNumTasks(2)
-                   .setVirtualCores(4)
-                   .setMemoryMb(1024)
-                   .setJavaOpts("")
-                   .setTaskModule("x.y")
-                   .build())
-             .build())
-        .build();
-   File file = tempFolder.newFile("jobPlan");
-   FileOutputStream outStream = null;
-   try {
-     outStream = new FileOutputStream(file);
-     job.writeTo(outStream);
-   }
-   finally {
-     if(outStream != null){
-       outStream.close();
-     }
-   }
-
-   DAGPlan inJob;
-   FileInputStream inputStream;
-   try {
-     inputStream = new FileInputStream(file);
-     inJob = DAGPlan.newBuilder().mergeFrom(inputStream).build();
-   }
-   finally {
-     outStream.close();
-   }
-
-   Assert.assertEquals(job, inJob);
-  }
-
-  @Test
-  public void testUserPayloadSerde() {
-    DAG dag = new DAG("testDag");
-    ProcessorDescriptor pd1 = new ProcessorDescriptor("processor1").
-        setUserPayload("processor1Bytes".getBytes());
-    ProcessorDescriptor pd2 = new ProcessorDescriptor("processor2").
-        setUserPayload("processor2Bytes".getBytes());
-    Vertex v1 = new Vertex("v1", pd1, 10, Resource.newInstance(1024, 1));
-    Vertex v2 = new Vertex("v2", pd2, 1, Resource.newInstance(1024, 1));
-    v1.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
-    v2.setJavaOpts("").setTaskEnvironment(new HashMap<String, String>())
-        .setTaskLocalResources(new HashMap<String, LocalResource>());
-
-    InputDescriptor inputDescriptor = new InputDescriptor("input").
-        setUserPayload("inputBytes".getBytes());
-    OutputDescriptor outputDescriptor = new OutputDescriptor("output").
-        setUserPayload("outputBytes".getBytes());
-    Edge edge = new Edge(v1, v2, new EdgeProperty(
-        DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
-        SchedulingType.SEQUENTIAL, outputDescriptor, inputDescriptor));
-
-    dag.addVertex(v1).addVertex(v2).addEdge(edge);
-
-    DAGPlan dagProto = dag.createDag(new TezConfiguration());
-
-    assertEquals(2, dagProto.getVertexCount());
-    assertEquals(1, dagProto.getEdgeCount());
-
-    VertexPlan v1Proto = dagProto.getVertex(0);
-    VertexPlan v2Proto = dagProto.getVertex(1);
-    EdgePlan edgeProto = dagProto.getEdge(0);
-
-    assertEquals("processor1Bytes", new String(v1Proto.getProcessorDescriptor()
-        .getUserPayload().toByteArray()));
-    assertEquals("processor1", v1Proto.getProcessorDescriptor().getClassName());
-
-    assertEquals("processor2Bytes", new String(v2Proto.getProcessorDescriptor()
-        .getUserPayload().toByteArray()));
-    assertEquals("processor2", v2Proto.getProcessorDescriptor().getClassName());
-
-    assertEquals("inputBytes", new String(edgeProto.getEdgeDestination()
-        .getUserPayload().toByteArray()));
-    assertEquals("input", edgeProto.getEdgeDestination().getClassName());
-
-    assertEquals("outputBytes", new String(edgeProto.getEdgeSource()
-        .getUserPayload().toByteArray()));
-    assertEquals("output", edgeProto.getEdgeSource().getClassName());
-
-    EdgeProperty edgeProperty = DagTypeConverters
-        .createEdgePropertyMapFromDAGPlan(dagProto.getEdgeList().get(0));
-
-    byte[] ib = edgeProperty.getEdgeDestination().getUserPayload();
-    assertEquals("inputBytes", new String(ib));
-    assertEquals("input", edgeProperty.getEdgeDestination().getClassName());
-
-    byte[] ob = edgeProperty.getEdgeSource().getUserPayload();
-    assertEquals("outputBytes", new String(ob));
-    assertEquals("output", edgeProperty.getEdgeSource().getClassName());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
deleted file mode 100644
index b33f3a6..0000000
--- a/tez-dag-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ /dev/null
@@ -1,417 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.api;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestDAGVerify {
-
-  private final String dummyProcessorClassName = TestDAGVerify.class.getName();
-  private final String dummyInputClassName = TestDAGVerify.class.getName();
-  private final String dummyOutputClassName = TestDAGVerify.class.getName();
-  private final int dummyTaskCount = 2;
-  private final Resource dummyTaskResource = Resource.newInstance(1, 1);
-
-  //    v1
-  //    |
-  //    v2
-  @Test
-  public void testVerify1() {
-    Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor(dummyProcessorClassName),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Edge e1 = new Edge(v1, v2,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor(dummyOutputClassName),
-            new InputDescriptor(dummyInputClassName)));
-    DAG dag = new DAG("testDag");
-    dag.addVertex(v1);
-    dag.addVertex(v2);
-    dag.addEdge(e1);
-    dag.verify();
-  }
-
-  @Test(expected = IllegalStateException.class)  
-  public void testVerify2() {
-    Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor(dummyProcessorClassName),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Edge e1 = new Edge(v1, v2,
-        new EdgeProperty(DataMovementType.ONE_TO_ONE, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor(dummyOutputClassName),
-            new InputDescriptor(dummyInputClassName)));
-    DAG dag = new DAG("testDag");
-    dag.addVertex(v1);
-    dag.addVertex(v2);
-    dag.addEdge(e1);
-    dag.verify();
-  }
-
-  @Test(expected = IllegalStateException.class)  
-  public void testVerify3() {
-    Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor(dummyProcessorClassName),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Edge e1 = new Edge(v1, v2,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.EPHEMERAL, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor(dummyOutputClassName),
-            new InputDescriptor(dummyInputClassName)));
-    DAG dag = new DAG("testDag");
-    dag.addVertex(v1);
-    dag.addVertex(v2);
-    dag.addEdge(e1);
-    dag.verify();
-  }
-
-  @Test(expected = IllegalStateException.class)  
-  public void testVerify4() {
-    Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor(dummyProcessorClassName),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Edge e1 = new Edge(v1, v2,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.EPHEMERAL, SchedulingType.CONCURRENT, 
-            new OutputDescriptor(dummyOutputClassName),
-            new InputDescriptor(dummyInputClassName)));
-    DAG dag = new DAG("testDag");
-    dag.addVertex(v1);
-    dag.addVertex(v2);
-    dag.addEdge(e1);
-    dag.verify();
-  }
-
-  //    v1 <----
-  //      |     ^
-  //       v2   ^
-  //      |  |  ^
-  //    v3    v4
-  @Test
-  public void testCycle1() {
-    IllegalStateException ex=null;
-    Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v3 = new Vertex("v3",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v4 = new Vertex("v4",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Edge e1 = new Edge(v1, v2,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    Edge e2 = new Edge(v2, v3,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    Edge e3 = new Edge(v2, v4,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    Edge e4 = new Edge(v4, v1,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    DAG dag = new DAG("testDag");
-    dag.addVertex(v1);
-    dag.addVertex(v2);
-    dag.addVertex(v3);
-    dag.addVertex(v4);
-    dag.addEdge(e1);
-    dag.addEdge(e2);
-    dag.addEdge(e3);
-    dag.addEdge(e4);
-    try{
-      dag.verify();
-    }
-    catch (IllegalStateException e){
-      ex = e;
-    }
-    Assert.assertNotNull(ex);
-    System.out.println(ex.getMessage());
-    Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
-  }
-
-  //     v1
-  //      |
-  //    -> v2
-  //    ^  | |
-  //    v3    v4
-  @Test
-  public void testCycle2() {
-    IllegalStateException ex=null;
-    Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v3 = new Vertex("v3",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v4 = new Vertex("v4",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Edge e1 = new Edge(v1, v2,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    Edge e2 = new Edge(v2, v3,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    Edge e3 = new Edge(v2, v4,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    Edge e4 = new Edge(v3, v2,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    DAG dag = new DAG("testDag");
-    dag.addVertex(v1);
-    dag.addVertex(v2);
-    dag.addVertex(v3);
-    dag.addVertex(v4);
-    dag.addEdge(e1);
-    dag.addEdge(e2);
-    dag.addEdge(e3);
-    dag.addEdge(e4);
-    try{
-      dag.verify();
-    }
-    catch (IllegalStateException e){
-      ex = e;
-    }
-    Assert.assertNotNull(ex);
-    System.out.println(ex.getMessage());
-    Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle"));
-  }
-
-  @Test
-  public void repeatedVertexName() {
-    IllegalStateException ex=null;
-    Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v1repeat = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    DAG dag = new DAG("testDag");
-    dag.addVertex(v1);
-    dag.addVertex(v1repeat);
-    try {
-      dag.verify();
-    }
-    catch (IllegalStateException e){
-      ex = e;
-    }
-    Assert.assertNotNull(ex);
-    System.out.println(ex.getMessage());
-    Assert.assertTrue(ex.getMessage().startsWith("DAG contains multiple vertices with name"));
-  }
-
-  //  v1  v2
-  //   |  |
-  //    v3
-  @Test
-  public void BinaryInputDisallowed() {
-    IllegalStateException ex=null;
-    try {
-      Vertex v1 = new Vertex("v1",
-          new ProcessorDescriptor("MapProcessor"),
-          dummyTaskCount, dummyTaskResource);
-      Vertex v2 = new Vertex("v2",
-          new ProcessorDescriptor("MapProcessor"),
-          dummyTaskCount, dummyTaskResource);
-      Vertex v3 = new Vertex("v3",
-          new ProcessorDescriptor("ReduceProcessor"),
-          dummyTaskCount, dummyTaskResource);
-      Edge e1 = new Edge(v1, v3,
-          new EdgeProperty(DataMovementType.ONE_TO_ONE, 
-              DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-              new OutputDescriptor("dummy output class"),
-              new InputDescriptor("dummy input class")));
-      Edge e2 = new Edge(v2, v3,
-          new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-              DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-              new OutputDescriptor("dummy output class"),
-              new InputDescriptor("dummy input class")));
-      DAG dag = new DAG("testDag");
-      dag.addVertex(v1);
-      dag.addVertex(v2);
-      dag.addVertex(v3);
-      dag.addEdge(e1);
-      dag.addEdge(e2);
-      dag.verify();
-    }
-    catch (IllegalStateException e){
-      ex = e;
-    }
-    Assert.assertNotNull(ex);
-    System.out.println(ex.getMessage());
-    Assert.assertTrue(ex.getMessage().startsWith(
-        "Unsupported connection pattern on edge"));
-  }
-
-  //  v1  v2
-  //   |  |
-  //    v3
-  @Test
-  public void BinaryInputAllowed() {
-    Vertex v1 = new Vertex("v1",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v2 = new Vertex("v2",
-        new ProcessorDescriptor("MapProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Vertex v3 = new Vertex("v3",
-        new ProcessorDescriptor("ReduceProcessor"),
-        dummyTaskCount, dummyTaskResource);
-    Edge e1 = new Edge(v1, v3,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    Edge e2 = new Edge(v2, v3,
-        new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor("dummy output class"),
-            new InputDescriptor("dummy input class")));
-    DAG dag = new DAG("testDag");
-    dag.addVertex(v1);
-    dag.addVertex(v2);
-    dag.addVertex(v3);
-    dag.addEdge(e1);
-    dag.addEdge(e2);
-    dag.verify();
-  }
-
-  //   v1
-  //  |  |
-  //  v2  v3
-  @Test
-  public void BinaryOutput() {
-    IllegalStateException ex=null;
-    try {
-      Vertex v1 = new Vertex("v1",
-          new ProcessorDescriptor("MapProcessor"),
-          dummyTaskCount, dummyTaskResource);
-      Vertex v2 = new Vertex("v2",
-          new ProcessorDescriptor("MapProcessor"),
-          dummyTaskCount, dummyTaskResource);
-      Vertex v3 = new Vertex("v3",
-          new ProcessorDescriptor("MapProcessor"),
-          dummyTaskCount, dummyTaskResource);
-      Edge e1 = new Edge(v1, v2,
-          new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-              DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-              new OutputDescriptor("dummy output class"),
-              new InputDescriptor("dummy input class")));
-      Edge e2 = new Edge(v1, v2,
-          new EdgeProperty(DataMovementType.SCATTER_GATHER, 
-              DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, 
-              new OutputDescriptor("dummy output class"),
-              new InputDescriptor("dummy input class")));
-      DAG dag = new DAG("testDag");
-      dag.addVertex(v1);
-      dag.addVertex(v2);
-      dag.addVertex(v3);
-      dag.addEdge(e1);
-      dag.addEdge(e2);
-      dag.verify();
-    }
-    catch (IllegalStateException e){
-      ex = e;
-    }
-    Assert.assertNotNull(ex);
-    System.out.println(ex.getMessage());
-    Assert.assertTrue(ex.getMessage().startsWith("Vertex has outDegree>1"));
-  }
-
-  @Test
-  public void testDagWithNoVertices() {
-    IllegalStateException ex=null;
-    try {
-      DAG dag = new DAG("testDag");
-      dag.verify();
-    }
-    catch (IllegalStateException e){
-      ex = e;
-    }
-    Assert.assertNotNull(ex);
-    System.out.println(ex.getMessage());
-    Assert.assertTrue(ex.getMessage()
-        .startsWith("Invalid dag containing 0 vertices"));
-  }
-
-  @SuppressWarnings("unused")
-  @Test
-  public void testInvalidVertexConstruction() {
-    try {
-      Vertex v1 = new Vertex("v1",
-          new ProcessorDescriptor("MapProcessor"),
-          0, dummyTaskResource);
-      Assert.fail("Expected exception for 0 parallelism");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue(e.getMessage().startsWith("Parallelism cannot be 0"));
-    }
-    try {
-      Vertex v1 = new Vertex("v1",
-          new ProcessorDescriptor("MapProcessor"),
-          1, null);
-      Assert.fail("Expected exception for 0 parallelism");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue(e.getMessage().startsWith("Resource cannot be null"));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/pom.xml
----------------------------------------------------------------------
diff --git a/tez-dag/pom.xml b/tez-dag/pom.xml
index 875a196..bc6aeef 100644
--- a/tez-dag/pom.xml
+++ b/tez-dag/pom.xml
@@ -29,6 +29,10 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
       <artifactId>tez-common</artifactId>
     </dependency>
     <dependency>
@@ -55,14 +59,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-dag-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-engine-api</artifactId>
-    </dependency>
-    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index 36486c9..7a143a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -44,7 +44,6 @@ import org.apache.tez.common.records.ProceedToCompletionResponse;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.Task;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputConsumable;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
 import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
 import org.apache.tez.dag.app.rm.container.AMContainerImpl;
@@ -56,9 +55,6 @@ import org.apache.tez.engine.api.impl.TezEvent;
 import org.apache.tez.engine.api.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.api.impl.TezHeartbeatResponse;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
-import org.apache.tez.engine.records.OutputContext;
-import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
-import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
 
 @SuppressWarnings("unchecked")
 public class TaskAttemptListenerImpTezDag extends AbstractService implements
@@ -176,29 +172,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
-      int fromEventIdx, int maxEvents,
-      TezTaskAttemptID taskAttemptID) {
-
-    LOG.info("Dependency Completion Events request from " + taskAttemptID
-        + ". fromEventID " + fromEventIdx + " maxEvents " + maxEvents);
-
-    // TODO: shouldReset is never used. See TT. Ask for Removal.
-    boolean shouldReset = false;
-    TezDependentTaskCompletionEvent[] events =
-        context.getCurrentDAG().
-            getVertex(taskAttemptID.getTaskID().getVertexID()).
-                getTaskAttemptCompletionEvents(taskAttemptID, fromEventIdx, maxEvents);
-
-    taskHeartbeatHandler.progressing(taskAttemptID);
-    pingContainerHeartbeatHandler(taskAttemptID);
-
-    // No filters for now. Only required events stored in a vertex.
-
-    return new TezTaskDependencyCompletionEventsUpdate(events,shouldReset);
-  }
-
-  @Override
   public ContainerTask getTask(ContainerContext containerContext)
       throws IOException {
 
@@ -370,17 +343,6 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
   }
 
   @Override
-  public void outputReady(TezTaskAttemptID taskAttemptId,
-      OutputContext outputContext) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("AttemptId: " + taskAttemptId + " reported output context: "
-          + outputContext);
-    }
-    context.getEventHandler().handle(
-        new TaskAttemptEventOutputConsumable(taskAttemptId, outputContext));
-  }
-
-  @Override
   public ProceedToCompletionResponse
       proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
index c476966..2779faf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/EdgeManager.java
@@ -20,9 +20,9 @@ package org.apache.tez.dag.app.dag;
 
 import java.util.List;
 
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
 
 public abstract class EdgeManager {
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputConsumable.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputConsumable.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputConsumable.java
deleted file mode 100644
index f10209f..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputConsumable.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.records.OutputContext;
-
-public class TaskAttemptEventOutputConsumable extends TaskAttemptEvent {
-
-  private final OutputContext outputContext;
-
-  public TaskAttemptEventOutputConsumable(TezTaskAttemptID id,
-      OutputContext outputContext) {
-    super(id, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE);
-    this.outputContext = outputContext;
-  }
-
-  public OutputContext getOutputContext() {
-    return this.outputContext;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
index 71f17ac..b05a6f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/BroadcastEdgeManager.java
@@ -22,9 +22,9 @@ import java.util.List;
 
 import org.apache.tez.dag.app.dag.EdgeManager;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
 
 public class BroadcastEdgeManager extends EdgeManager {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
index 3605857..060a112 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/Edge.java
@@ -31,14 +31,14 @@ import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
 import org.apache.tez.engine.api.impl.EventMetaData;
 import org.apache.tez.engine.api.impl.InputSpec;
 import org.apache.tez.engine.api.impl.OutputSpec;
 import org.apache.tez.engine.api.impl.TezEvent;
 import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
 
 public class Edge {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
index 7c4743e..a916ad2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/OneToOneEdgeManager.java
@@ -22,9 +22,9 @@ import java.util.List;
 
 import org.apache.tez.dag.app.dag.EdgeManager;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
 
 public class OneToOneEdgeManager extends EdgeManager {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
index 380b6b6..1d4df5b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ScatterGatherEdgeManager.java
@@ -22,9 +22,9 @@ import java.util.List;
 
 import org.apache.tez.dag.app.dag.EdgeManager;
 import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
 
 public class ScatterGatherEdgeManager extends EdgeManager {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
index b854a43..a0ed329 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ShuffleVertexManager.java
@@ -39,9 +39,9 @@ import org.apache.tez.dag.app.dag.VertexScheduler;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
-import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
+import org.apache.tez.engine.api.events.InputReadErrorEvent;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 1ec1225..74005b7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -106,14 +106,14 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.events.DataMovementEvent;
+import org.apache.tez.engine.api.events.InputFailedEvent;
 import org.apache.tez.engine.api.events.TaskAttemptFailedEvent;
 import org.apache.tez.engine.api.events.TaskStatusUpdateEvent;
 import org.apache.tez.engine.api.impl.EventMetaData;
 import org.apache.tez.engine.api.impl.InputSpec;
 import org.apache.tez.engine.api.impl.OutputSpec;
 import org.apache.tez.engine.api.impl.TezEvent;
-import org.apache.tez.engine.newapi.events.DataMovementEvent;
-import org.apache.tez.engine.newapi.events.InputFailedEvent;
 import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
 
 import com.google.common.annotations.VisibleForTesting;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java b/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
new file mode 100644
index 0000000..7a4dd13
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/engine/records/TezDependentTaskCompletionEvent.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+/**
+ * This is used to track task completion events on 
+ * job tracker. 
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+// TODO TEZAM3 This needs to be more generic. Maybe some kind of a serialized
+// blob - which can be interpretted by the Input plugin.
+public class TezDependentTaskCompletionEvent implements Writable {
+  @InterfaceAudience.Public
+  @InterfaceStability.Evolving
+  // TODO EVENTUALLY - Remove TIPFAILED state ?
+  static public enum Status {FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED};
+    
+  private int eventId;
+  private int taskRunTime; // using int since runtime is the time difference
+  private TezTaskAttemptID taskAttemptId;
+  private long dataSize;
+  Status status;
+  byte[] userPayload;
+  // TODO TEZAM2 Get rid of the isMap field. Job specific type information can be determined from TaskAttemptId.getTaskType
+//  boolean isMap = false;
+  public static final TezDependentTaskCompletionEvent[] EMPTY_ARRAY = 
+    new TezDependentTaskCompletionEvent[0];
+
+  public TezDependentTaskCompletionEvent() {
+    taskAttemptId = new TezTaskAttemptID();
+  }
+  
+  /**
+   * Constructor. eventId should be created externally and incremented
+   * per event for each job. 
+   * @param eventId event id, event id should be unique and assigned in
+   *  incrementally, starting from 0. 
+   * @param taskAttemptId task id
+   * @param status task's status 
+   * @param taskTrackerHttp task tracker's host:port for http. 
+   */
+  public TezDependentTaskCompletionEvent(int eventId, 
+                             TezTaskAttemptID taskAttemptId,
+//                             boolean isMap,
+                             Status status, 
+                             int runTime,
+                             long dataSize){
+      
+    this.taskAttemptId = taskAttemptId;
+//    this.isMap = isMap;
+    this.eventId = eventId; 
+    this.status =status; 
+    this.taskRunTime = runTime;
+    this.dataSize = dataSize;
+  }
+  
+  public TezDependentTaskCompletionEvent clone() {
+    TezDependentTaskCompletionEvent clone = new TezDependentTaskCompletionEvent(
+        this.eventId, this.taskAttemptId, this.status, 
+        this.taskRunTime, this.dataSize);
+    
+    return clone;
+  }
+  
+  /**
+   * Returns event Id. 
+   * @return event id
+   */
+  public int getEventId() {
+    return eventId;
+  }
+
+  /**
+   * Returns task id. 
+   * @return task id
+   */
+  public TezTaskAttemptID getTaskAttemptID() {
+    return taskAttemptId;
+  }
+  
+  /**
+   * Returns enum Status.SUCESS or Status.FAILURE.
+   * @return task tracker status
+   */
+  public Status getStatus() {
+    return status;
+  }
+  
+  /**
+   * Returns time (in millisec) the task took to complete. 
+   */
+  public int getTaskRunTime() {
+    return taskRunTime;
+  }
+  
+  /**
+   * Return size of output produced by the task
+   */
+  public long getDataSize() {
+    return dataSize;
+  }
+  
+  /**
+   * @return user payload. Maybe null
+   */
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  /**
+   * Set the task completion time
+   * @param taskCompletionTime time (in millisec) the task took to complete
+   */
+  protected void setTaskRunTime(int taskCompletionTime) {
+    this.taskRunTime = taskCompletionTime;
+  }
+
+  /**
+   * set event Id. should be assigned incrementally starting from 0. 
+   * @param eventId
+   */
+  public void setEventId(int eventId) {
+    this.eventId = eventId;
+  }
+
+  /**
+   * Sets task id. 
+   * @param taskId
+   */
+  public void setTaskAttemptID(TezTaskAttemptID taskId) {
+    this.taskAttemptId = taskId;
+  }
+  
+  /**
+   * Set task status. 
+   * @param status
+   */
+  public void setTaskStatus(Status status) {
+    this.status = status;
+  }
+  
+  /**
+   * Set the user payload
+   * @param userPayload
+   */
+  public void setUserPayload(byte[] userPayload) {
+    this.userPayload = userPayload;
+  }
+    
+  @Override
+  public String toString(){
+    StringBuffer buf = new StringBuffer(); 
+    buf.append("Task Id : "); 
+    buf.append(taskAttemptId); 
+    buf.append(", Status : ");  
+    buf.append(status.name());
+    return buf.toString();
+  }
+    
+  @Override
+  public boolean equals(Object o) {
+    // not counting userPayload as that is a piggyback mechanism
+    if(o == null)
+      return false;
+    if(o.getClass().equals(this.getClass())) {
+      TezDependentTaskCompletionEvent event = (TezDependentTaskCompletionEvent) o;
+      return this.eventId == event.getEventId()
+             && this.status.equals(event.getStatus())
+             && this.taskAttemptId.equals(event.getTaskAttemptID()) 
+             && this.taskRunTime == event.getTaskRunTime()
+             && this.dataSize == event.getDataSize();
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode(); 
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    taskAttemptId.write(out);
+//    out.writeBoolean(isMap);
+    WritableUtils.writeEnum(out, status);
+    WritableUtils.writeVInt(out, taskRunTime);
+    WritableUtils.writeVInt(out, eventId);
+    WritableUtils.writeCompressedByteArray(out, userPayload);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    taskAttemptId.readFields(in);
+//    isMap = in.readBoolean();
+    status = WritableUtils.readEnum(in, Status.class);
+    taskRunTime = WritableUtils.readVInt(in);
+    eventId = WritableUtils.readVInt(in);
+    userPayload = WritableUtils.readCompressedByteArray(in);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java b/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
new file mode 100644
index 0000000..13c9088
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/engine/records/TezTaskDependencyCompletionEventsUpdate.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.records;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+
+public class TezTaskDependencyCompletionEventsUpdate implements Writable {
+  TezDependentTaskCompletionEvent[] events;
+  boolean reset;
+
+  public TezTaskDependencyCompletionEventsUpdate() { }
+
+  public TezTaskDependencyCompletionEventsUpdate(
+      TezDependentTaskCompletionEvent[] events, boolean reset) {
+    this.events = events;
+    this.reset = reset;
+  }
+
+  public boolean shouldReset() {
+    return reset;
+  }
+
+  public TezDependentTaskCompletionEvent[] getDependentTaskCompletionEvents() {
+    return events;
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(reset);
+    out.writeInt(events.length);
+    for (TezDependentTaskCompletionEvent event : events) {
+      event.write(out);
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    reset = in.readBoolean();
+    events = new TezDependentTaskCompletionEvent[in.readInt()];
+    for (int i = 0; i < events.length; ++i) {
+      events[i] = new TezDependentTaskCompletionEvent();
+      events[i].readFields(in);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index d4eae9d..c2457e1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.TezTaskContext;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ContainerContext;
@@ -66,6 +65,7 @@ import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.engine.api.impl.TaskSpec;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.TokenCache;
 import org.junit.Test;
@@ -109,7 +109,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.RUNNING);
     wc.verifyNoOutgoingEvents();
     assertFalse(pulledTask.shouldDie());
-    assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
+    assertEquals(wc.taskSpec.getTaskAttemptID(), pulledTask.getTask()
         .getTaskAttemptID());
     assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
@@ -165,7 +165,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.RUNNING);
     wc.verifyNoOutgoingEvents();
     assertFalse(pulledTask.shouldDie());
-    assertEquals(wc.tezTaskContext.getTaskAttemptId(), pulledTask.getTask()
+    assertEquals(wc.taskSpec.getTaskAttemptID(), pulledTask.getTask()
         .getTaskAttemptID());
     assertEquals(wc.taskAttemptID, wc.amContainer.getRunningTaskAttempt());
     assertEquals(0, wc.amContainer.getQueuedTaskAttempts().size());
@@ -824,7 +824,7 @@ public class TestAMContainer {
     TezTaskID taskID;
     TezTaskAttemptID taskAttemptID;
 
-    TezTaskContext tezTaskContext;
+    TaskSpec taskSpec;
 
     public AMContainerImpl amContainer;
 
@@ -859,8 +859,8 @@ public class TestAMContainer {
       taskID = new TezTaskID(vertexID, 1);
       taskAttemptID = new TezTaskAttemptID(taskID, 1);
 
-      tezTaskContext = mock(TezTaskContext.class);
-      doReturn(taskAttemptID).when(tezTaskContext).getTaskAttemptId();
+      taskSpec = mock(TaskSpec.class);
+      doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
 
       amContainer = new AMContainerImpl(container, chh, tal,
           appContext);
@@ -904,7 +904,7 @@ public class TestAMContainer {
     public void assignTaskAttempt(TezTaskAttemptID taID) {
       reset(eventHandler);
       amContainer.handle(new AMContainerEventAssignTA(containerID, taID,
-          tezTaskContext));
+          taskSpec));
     }
 
     public AMContainerTask pullTaskToRun() {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dist/src/main/assembly/tez-dist-full.xml
----------------------------------------------------------------------
diff --git a/tez-dist/src/main/assembly/tez-dist-full.xml b/tez-dist/src/main/assembly/tez-dist-full.xml
index 3176dd1..383eb43 100644
--- a/tez-dist/src/main/assembly/tez-dist-full.xml
+++ b/tez-dist/src/main/assembly/tez-dist-full.xml
@@ -24,8 +24,7 @@
     <moduleSet>
       <useAllReactorProjects>true</useAllReactorProjects>
       <includes>
-        <include>org.apache.tez:tez-dag-api</include>
-        <include>org.apache.tez:tez-engine-api</include>
+        <include>org.apache.tez:tez-api</include>
       </includes>
       <binaries>
         <outputDirectory>/</outputDirectory>
@@ -40,8 +39,7 @@
       <outputDirectory>/lib</outputDirectory>
       <!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
       <excludes>
-        <exclude>org.apache.tez:tez-dag-api</exclude>
-        <exclude>org.apache.tez:tez-engine-api</exclude>
+        <exclude>org.apache.tez:tez-api</exclude>
         <exclude>*:*:test-jar</exclude>
         <exclude>org.apache.hadoop:hadoop-common</exclude>
         <exclude>org.apache.hadoop:hadoop-auth</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-dist/src/main/assembly/tez-dist.xml
----------------------------------------------------------------------
diff --git a/tez-dist/src/main/assembly/tez-dist.xml b/tez-dist/src/main/assembly/tez-dist.xml
index 01d0d40..aefb8a2 100644
--- a/tez-dist/src/main/assembly/tez-dist.xml
+++ b/tez-dist/src/main/assembly/tez-dist.xml
@@ -24,8 +24,7 @@
     <moduleSet>
       <useAllReactorProjects>true</useAllReactorProjects>
      <includes>
-        <include>org.apache.tez:tez-dag-api</include>
-        <include>org.apache.tez:tez-engine-api</include>
+       <include>org.apache.tez:tez-api</include>
       </includes>
       <binaries>
         <outputDirectory>/</outputDirectory>
@@ -42,8 +41,7 @@
       <useTransitiveFiltering>true</useTransitiveFiltering>
       <!-- Exclude hadoop artifacts. They will be found via HADOOP* env -->
       <excludes>
-        <exclude>org.apache.tez:tez-dag-api</exclude>
-        <exclude>org.apache.tez:tez-engine-api</exclude>
+        <exclude>org.apache.tez:tez-api</exclude>
         <exclude>*:*:test-jar</exclude>
         <exclude>org.apache.hadoop:hadoop-common</exclude>
         <exclude>org.apache.hadoop:hadoop-auth</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-engine-api/findbugs-exclude.xml b/tez-engine-api/findbugs-exclude.xml
deleted file mode 100644
index 5b11308..0000000
--- a/tez-engine-api/findbugs-exclude.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<FindBugsFilter>
-
-</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/pom.xml
----------------------------------------------------------------------
diff --git a/tez-engine-api/pom.xml b/tez-engine-api/pom.xml
deleted file mode 100644
index b19e96b..0000000
--- a/tez-engine-api/pom.xml
+++ /dev/null
@@ -1,91 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <parent>
-    <groupId>org.apache.tez</groupId>
-    <artifactId>tez</artifactId>
-    <version>0.2.0-SNAPSHOT</version>
-  </parent>
-  <artifactId>tez-engine-api</artifactId>
-  <dependencies>
-    <dependency>
-      <groupId>org.apache.tez</groupId>
-      <artifactId>tez-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-api</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-common</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>com.google.inject</groupId>
-      <artifactId>guice</artifactId>
-    </dependency>
-    <dependency>
-     <groupId>com.google.protobuf</groupId>
-     <artifactId>protobuf-java</artifactId>
-    </dependency>
-  </dependencies>
-
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.rat</groupId>
-        <artifactId>apache-rat-plugin</artifactId>
-        <configuration>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.hadoop</groupId>
-        <artifactId>hadoop-maven-plugins</artifactId>
-        <executions>
-          <execution>
-            <id>compile-protoc</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>protoc</goal>
-            </goals>
-            <configuration>
-              <protocVersion>${protobuf.version}</protocVersion>
-              <protocCommand>${protoc.path}</protocCommand>
-              <imports>
-                <param>${basedir}/src/main/proto</param>
-              </imports>
-              <source>
-                <directory>${basedir}/src/main/proto</directory>
-                <includes>
-                  <include>Events.proto</include>
-                </includes>
-              </source>
-              <output>${project.build.directory}/generated-sources/java</output>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-    </plugins>
-  </build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java
deleted file mode 100644
index 64c3834..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Input.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * {@link Input} represents a pipe through which an <em>tez</em> task
- * can get input key/value pairs.
- */
-public interface Input {
-  
-  /**
-   * Initialize <code>Input</code>.
-   * 
-   * @param conf job configuration
-   * @param master master process controlling the task
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException;
-  
-  /**
-   * Check if there is another key/value pair.
-   * 
-   * @return true if a key/value pair was read
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public boolean hasNext() throws IOException, InterruptedException;
-
-  /**
-   * Get the next key.
-   * 
-   * @return the current key or null if there is no current key
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public Object getNextKey() throws IOException, InterruptedException;
-  
-  /**
-   * Get the next values.
-   * 
-   * @return the object that was read
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public Iterable<Object> getNextValues() 
-      throws IOException, InterruptedException;
-  
-  /**
-   * The current progress of the {@link Input} through its data.
-   * 
-   * @return a number between 0.0 and 1.0 that is the fraction of the data read
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public float getProgress() throws IOException, InterruptedException;
-  
-  /**
-   * Close this <code>Input</code> for future operations.
-   */
-  public void close() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java
deleted file mode 100644
index f3add9a..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Master.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-import org.apache.hadoop.ipc.ProtocolInfo;
-import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
-
-/**
- * {@link Master} represents the master controlling the {@link Task}. 
- */
-@ProtocolInfo(protocolName = "Master", protocolVersion = 1)
-public interface Master extends VersionedProtocol {
-
-  // TODO TEZAM3 This likely needs to change to be a little more generic.
-  // Many output / input relationships cannot be captured via this. The current
-  // form works primarily works for the existing MR
-
-  TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
-      int fromEventIdx, int maxEventsToFetch,
-      TezTaskAttemptID taskAttemptId);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java
deleted file mode 100644
index daa80d0..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Output.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.engine.records.OutputContext;
-
-/**
- * {@link Output} represents a pipe through which an <em>tez</em> task
- * can send out outputs.
- */
-public interface Output {
-
-  /**
-   * Initialize <code>Output</code>.
-   * 
-   * @param conf job configuration
-   * @param master master process controlling the task
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException;
-
-  /** 
-   * Writes a key/value pair.
-   *
-   * @param key the key to write.
-   * @param value the value to write.
-   * @throws IOException
-   */      
-  public void write(Object key, Object value
-                             ) throws IOException, InterruptedException;
-
-  /**
-   * Returns the OutputContext for the particular <code>Output</code>. 
-   * 
-   * @return the OutputContext for this Output if it exists, otherwise null.
-   */
-  public OutputContext getOutputContext();
-  
-  /** 
-   * Close this <code>Output</code> for future operations.
-   * 
-   * @throws IOException
-   */ 
-  public void close() throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d316f723/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
deleted file mode 100644
index 550ee73..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Processor.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * {@link Processor} represents the <em>tez</em> entity responsible for
- * consuming {@link Input} and producing {@link Output}. 
- */
-public interface Processor {
-  
-  /**
-   * Initialize the <code>Processor</code>.
-   * 
-   * @param conf job-configuration
-   * @param master master process controlling the task
-   * @throws IOException 
-   * @throws InterruptedException
-   */
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException;
-  
-  /**
-   * Process input data from <code>input</code> and 
-   * send it to <code>output</code>.
-   * 
-   * @param in input
-   * @param out output
-   * @param master master process controlling the task
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public void process(Input[] in, Output[]  out)
-      throws IOException, InterruptedException;
-
-  /**
-   * Close the {@link Processor}.
-   * 
-   * @throws IOException
-   * @throws InterruptedException
-   */
-  public void close() throws IOException, InterruptedException;
-
-}