You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:51 UTC

[4/7] tez git commit: TEZ-2708. Rename classes and variables post TEZ-2003 changes. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
deleted file mode 100644
index 5159aff..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ContainerContext;
-import org.apache.tez.common.ContainerTask;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.security.JobTokenIdentifier;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.common.security.TokenCache;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskCommunicator;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.serviceplugins.api.ContainerEndReason;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.api.TaskHeartbeatRequest;
-import org.apache.tez.dag.api.TaskHeartbeatResponse;
-import org.apache.tez.dag.api.TezException;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.dag.api.TaskCommunicatorContext;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.Vertex;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
-import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
-import org.apache.tez.dag.app.dag.event.VertexEventType;
-import org.apache.tez.dag.app.rm.container.AMContainer;
-import org.apache.tez.dag.app.rm.container.AMContainerMap;
-import org.apache.tez.dag.app.rm.container.AMContainerTask;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.events.DataMovementEvent;
-import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
-import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
-import org.apache.tez.runtime.api.impl.EventType;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.runtime.api.impl.TezEvent;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-@SuppressWarnings("unchecked")
-// TODO TEZ-2003 (post) TEZ-2696 Rename to TestTezTaskCommunicator
-public class TestTaskAttemptListenerImplTezDag {
-  private ApplicationId appId;
-  private ApplicationAttemptId appAttemptId;
-  private AppContext appContext;
-  Credentials credentials;
-  AMContainerMap amContainerMap;
-  EventHandler eventHandler;
-  DAG dag;
-  TaskAttemptListenerImpTezDag taskAttemptListener;
-  ContainerTask containerTask;
-  AMContainerTask amContainerTask;
-  TaskSpec taskSpec;
-
-  TezVertexID vertexID;
-  TezTaskID taskID;
-  TezTaskAttemptID taskAttemptID;
-
-  @Before
-  public void setUp() {
-    appId = ApplicationId.newInstance(1000, 1);
-    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-    dag = mock(DAG.class);
-    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
-    vertexID = TezVertexID.getInstance(dagID, 1);
-    taskID = TezTaskID.getInstance(vertexID, 1);
-    taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
-    credentials = new Credentials();
-
-    amContainerMap = mock(AMContainerMap.class);
-    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
-
-    eventHandler = mock(EventHandler.class);
-
-    MockClock clock = new MockClock();
-    
-    appContext = mock(AppContext.class);
-    doReturn(eventHandler).when(appContext).getEventHandler();
-    doReturn(dag).when(appContext).getCurrentDAG();
-    doReturn(appAcls).when(appContext).getApplicationACLs();
-    doReturn(amContainerMap).when(appContext).getAllContainers();
-    doReturn(clock).when(appContext).getClock();
-    
-    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
-    doReturn(credentials).when(appContext).getAppCredentials();
-    NodeId nodeId = NodeId.newInstance("localhost", 0);
-    AMContainer amContainer = mock(AMContainer.class);
-    Container container = mock(Container.class);
-    doReturn(nodeId).when(container).getNodeId();
-    doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
-    doReturn(container).when(amContainer).getContainer();
-
-    Configuration conf = new TezConfiguration();
-    UserPayload defaultPayload;
-    try {
-      defaultPayload = TezUtils.createUserPayloadFromConf(conf);
-    } catch (IOException e) {
-      throw new TezUncheckedException(e);
-    }
-    taskAttemptListener = new TaskAttemptListenerImplForTest(appContext,
-        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class),
-        Lists.newArrayList(
-            new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
-                .setUserPayload(defaultPayload)));
-
-    taskSpec = mock(TaskSpec.class);
-    doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
-    amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0);
-    containerTask = null;
-  }
-
-  @Test(timeout = 5000)
-  public void testGetTask() throws IOException {
-
-    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
-    TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
-
-    ContainerId containerId1 = createContainerId(appId, 1);
-    ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
-    containerTask = tezUmbilical.getTask(containerContext1);
-    assertTrue(containerTask.shouldDie());
-
-    ContainerId containerId2 = createContainerId(appId, 2);
-    ContainerContext containerContext2 = new ContainerContext(containerId2.toString());
-    taskAttemptListener.registerRunningContainer(containerId2, 0);
-    containerTask = tezUmbilical.getTask(containerContext2);
-    assertNull(containerTask);
-
-    // Valid task registered
-    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0);
-    containerTask = tezUmbilical.getTask(containerContext2);
-    assertFalse(containerTask.shouldDie());
-    assertEquals(taskSpec, containerTask.getTaskSpec());
-
-    // Task unregistered. Should respond to heartbeats
-    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
-    containerTask = tezUmbilical.getTask(containerContext2);
-    assertNull(containerTask);
-
-    // Container unregistered. Should send a shouldDie = true
-    taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER, null);
-    containerTask = tezUmbilical.getTask(containerContext2);
-    assertTrue(containerTask.shouldDie());
-
-    ContainerId containerId3 = createContainerId(appId, 3);
-    ContainerContext containerContext3 = new ContainerContext(containerId3.toString());
-    taskAttemptListener.registerRunningContainer(containerId3, 0);
-
-    // Register task to container3, followed by unregistering container 3 all together
-    TaskSpec taskSpec2 = mock(TaskSpec.class);
-    TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
-    doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
-    AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
-    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
-    taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null);
-    containerTask = tezUmbilical.getTask(containerContext3);
-    assertTrue(containerTask.shouldDie());
-  }
-
-  @Test(timeout = 5000)
-  public void testGetTaskMultiplePulls() throws IOException {
-    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
-    TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
-
-    ContainerId containerId1 = createContainerId(appId, 1);
-
-    ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
-    taskAttemptListener.registerRunningContainer(containerId1, 0);
-    containerTask = tezUmbilical.getTask(containerContext1);
-    assertNull(containerTask);
-
-    // Register task
-    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0);
-    containerTask = tezUmbilical.getTask(containerContext1);
-    assertFalse(containerTask.shouldDie());
-    assertEquals(taskSpec, containerTask.getTaskSpec());
-
-    // Try pulling again - simulates re-use pull
-    containerTask = tezUmbilical.getTask(containerContext1);
-    assertNull(containerTask);
-  }
-
-  @Test (timeout = 5000)
-  public void testTaskEventRouting() throws Exception {
-    List<TezEvent> events =  Arrays.asList(
-      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
-      new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
-      new TezEvent(new TaskAttemptCompletedEvent(), null)
-    );
-
-    generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
-
-    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
-    verify(eventHandler, times(2)).handle(arg.capture());
-    final List<Event> argAllValues = arg.getAllValues();
-
-    final Event statusUpdateEvent = argAllValues.get(0);
-    assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
-        statusUpdateEvent.getType());
-
-    final Event vertexEvent = argAllValues.get(1);
-    final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
-    assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
-        vertexEvent.getType());
-    assertEquals(EventType.DATA_MOVEMENT_EVENT,
-        vertexRouteEvent.getEvents().get(0).getEventType());
-    assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
-        vertexRouteEvent.getEvents().get(1).getEventType());
-
-  }
-
-  @Test (timeout = 5000)
-  public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
-    List<TezEvent> events = Arrays.asList(
-      new TezEvent(new TaskAttemptCompletedEvent(), null)
-    );
-    generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
-
-    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
-    verify(eventHandler, times(1)).handle(arg.capture());
-    final List<Event> argAllValues = arg.getAllValues();
-
-    final Event event = argAllValues.get(0);
-    assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
-        event.getType());
-  }
-  
-  @Test (timeout = 5000)
-  public void testTaskHeartbeatResponse() throws Exception {
-    List<TezEvent> events = new ArrayList<TezEvent>();
-    List<TezEvent> eventsToSend = new ArrayList<TezEvent>();
-    TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
-    
-    assertEquals(2, response.getNextFromEventId());
-    assertEquals(eventsToSend, response.getEvents());
-  }
-
-  //try 10 times to allocate random port, fail it if no one is succeed.
-  @Test (timeout = 5000)
-  public void testPortRange() {
-    boolean succeedToAllocate = false;
-    Random rand = new Random();
-    for (int i = 0; i < 10; ++i) {
-      int nextPort = 1024 + rand.nextInt(65535 - 1024);
-      if (testPortRange(nextPort)) {
-        succeedToAllocate = true;
-        break;
-      }
-    }
-    if (!succeedToAllocate) {
-      fail("Can not allocate free port even in 10 iterations for TaskAttemptListener");
-    }
-  }
-
-  // TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well.
-  @Test (timeout= 5000)
-  public void testPortRange_NotSpecified() throws IOException {
-    Configuration conf = new Configuration();
-    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
-        "fakeIdentifier"));
-    Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
-        new JobTokenSecretManager());
-    sessionToken.setService(identifier.getJobId());
-    TokenCache.setSessionToken(sessionToken, credentials);
-    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-    taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
-        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists.newArrayList(
-        new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
-            .setUserPayload(userPayload)));
-    // no exception happen, should started properly
-    taskAttemptListener.init(conf);
-    taskAttemptListener.start();
-  }
-
-  private boolean testPortRange(int port) {
-    boolean succeedToAllocate = true;
-    try {
-      Configuration conf = new Configuration();
-      
-      JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
-          "fakeIdentifier"));
-      Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
-          new JobTokenSecretManager());
-      sessionToken.setService(identifier.getJobId());
-      TokenCache.setSessionToken(sessionToken, credentials);
-
-      conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
-      UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-
-      taskAttemptListener = new TaskAttemptListenerImpTezDag(appContext,
-          mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists
-          .newArrayList(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
-              .setUserPayload(userPayload)));
-      taskAttemptListener.init(conf);
-      taskAttemptListener.start();
-      int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort();
-      assertEquals(port, resultedPort);
-    } catch (Exception e) {
-      succeedToAllocate = false;
-    } finally {
-      if (taskAttemptListener != null) {
-        try {
-          taskAttemptListener.close();
-        } catch (IOException e) {
-          e.printStackTrace();
-          fail("fail to stop TaskAttemptListener");
-        }
-      }
-    }
-    return succeedToAllocate;
-  }
-
-  private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events,
-      int fromEventId, int maxEvents, int nextFromEventId,
-      List<TezEvent> sendEvents) throws IOException, TezException {
-    ContainerId containerId = createContainerId(appId, 1);
-    Vertex vertex = mock(Vertex.class);
-
-    doReturn(vertex).when(dag).getVertex(vertexID);
-    doReturn("test_vertex").when(vertex).getName();
-    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0);
-    doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents);
-
-    taskAttemptListener.registerRunningContainer(containerId, 0);
-    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
-
-    TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
-    doReturn(containerId.toString()).when(request).getContainerIdentifier();
-    doReturn(containerId.toString()).when(request).getContainerIdentifier();
-    doReturn(taskAttemptID).when(request).getTaskAttemptId();
-    doReturn(events).when(request).getEvents();
-    doReturn(maxEvents).when(request).getMaxEvents();
-    doReturn(fromEventId).when(request).getStartIndex();
-
-    return taskAttemptListener.heartbeat(request);
-  }
-
-
-  private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
-    return ContainerId.newInstance(appAttemptId, containerIdx);
-  }
-
-  private static class TaskAttemptListenerImplForTest extends TaskAttemptListenerImpTezDag {
-
-    public TaskAttemptListenerImplForTest(AppContext context,
-                                          TaskHeartbeatHandler thh,
-                                          ContainerHeartbeatHandler chh,
-                                          List<NamedEntityDescriptor> taskCommDescriptors) {
-      super(context, thh, chh, taskCommDescriptors);
-    }
-
-    @Override
-    TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
-      return new TezTaskCommunicatorImplForTest(taskCommunicatorContext);
-    }
-
-  }
-
-  private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl {
-
-    public TezTaskCommunicatorImplForTest(
-        TaskCommunicatorContext taskCommunicatorContext) {
-      super(taskCommunicatorContext);
-    }
-
-    @Override
-    protected void startRpcServer() {
-    }
-
-    @Override
-    protected void stopRpcServer() {
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
deleted file mode 100644
index 74468f2..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag2.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.dag.app.dag.DAG;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
-import org.apache.tez.dag.app.rm.container.AMContainer;
-import org.apache.tez.dag.app.rm.container.AMContainerMap;
-import org.apache.tez.dag.app.rm.container.AMContainerTask;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-// TODO TEZ-2003. Rename to TestTaskAttemptListener | whatever TaskAttemptListener is renamed to.
-public class TestTaskAttemptListenerImplTezDag2 {
-
-  @Test(timeout = 5000)
-  public void testTaskAttemptFailedKilled() throws IOException {
-    ApplicationId appId = ApplicationId.newInstance(1000, 1);
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-    Credentials credentials = new Credentials();
-    AppContext appContext = mock(AppContext.class);
-    EventHandler eventHandler = mock(EventHandler.class);
-    DAG dag = mock(DAG.class);
-    AMContainerMap amContainerMap = mock(AMContainerMap.class);
-    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
-    doReturn(eventHandler).when(appContext).getEventHandler();
-    doReturn(dag).when(appContext).getCurrentDAG();
-    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
-    doReturn(credentials).when(appContext).getAppCredentials();
-    doReturn(appAcls).when(appContext).getApplicationACLs();
-    doReturn(amContainerMap).when(appContext).getAllContainers();
-    NodeId nodeId = NodeId.newInstance("localhost", 0);
-    AMContainer amContainer = mock(AMContainer.class);
-    Container container = mock(Container.class);
-    doReturn(nodeId).when(container).getNodeId();
-    doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
-    doReturn(container).when(amContainer).getContainer();
-
-    Configuration conf = new TezConfiguration();
-    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-    TaskAttemptListenerImpTezDag taskAttemptListener =
-        new TaskAttemptListenerImpTezDag(appContext, mock(TaskHeartbeatHandler.class),
-            mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor(
-            TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload)));
-
-    TaskSpec taskSpec1 = mock(TaskSpec.class);
-    TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class);
-    doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID();
-    AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10);
-
-    TaskSpec taskSpec2 = mock(TaskSpec.class);
-    TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
-    doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
-    AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10);
-
-    ContainerId containerId1 = createContainerId(appId, 1);
-    taskAttemptListener.registerRunningContainer(containerId1, 0);
-    taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0);
-    ContainerId containerId2 = createContainerId(appId, 2);
-    taskAttemptListener.registerRunningContainer(containerId2, 0);
-    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0);
-
-
-    taskAttemptListener
-        .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
-    taskAttemptListener
-        .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2");
-
-    ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
-    verify(eventHandler, times(2)).handle(argumentCaptor.capture());
-    assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed);
-    assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled);
-    TaskAttemptEventAttemptFailed failedEvent =
-        (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0);
-    TaskAttemptEventAttemptKilled killedEvent =
-        (TaskAttemptEventAttemptKilled) argumentCaptor.getAllValues().get(1);
-
-    assertEquals("Diagnostics1", failedEvent.getDiagnosticInfo());
-    assertEquals(TaskAttemptTerminationCause.COMMUNICATION_ERROR,
-        failedEvent.getTerminationCause());
-
-    assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo());
-    assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause());
-    // TODO TEZ-2003. Verify unregistration from the registered list
-  }
-
-  private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
-    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
-    ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx);
-    return containerId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
index 1545eb4..5222a2d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorContextImpl.java
@@ -34,7 +34,7 @@ public class TestTaskCommunicatorContextImpl {
   @Test(timeout = 5000)
   public void testIsKnownContainer() {
     AppContext appContext = mock(AppContext.class);
-    TaskAttemptListenerImpTezDag tal = mock(TaskAttemptListenerImpTezDag.class);
+    TaskCommunicatorManager tal = mock(TaskCommunicatorManager.class);
 
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class), tal, mock(
         ContainerSignatureMatcher.class), appContext);

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
index 4f68fab..be7adde 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager.java
@@ -220,7 +220,7 @@ public class TestTaskCommunicatorManager {
   }
 
 
-  static class TaskCommManagerForMultipleCommTest extends TaskAttemptListenerImpTezDag {
+  static class TaskCommManagerForMultipleCommTest extends TaskCommunicatorManager {
 
     // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor,
     // and regular variables will not be initialized at this point.

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
new file mode 100644
index 0000000..e8ce429
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager1.java
@@ -0,0 +1,425 @@
+/*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.ContainerContext;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TaskCommunicator;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.serviceplugins.api.ContainerEndReason;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.api.TaskHeartbeatRequest;
+import org.apache.tez.dag.api.TaskHeartbeatResponse;
+import org.apache.tez.dag.api.TezException;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.tez.common.TezTaskUmbilicalProtocol;
+import org.apache.tez.dag.api.TaskCommunicatorContext;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.Vertex;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
+import org.apache.tez.dag.app.dag.event.VertexEventType;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
+import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
+import org.apache.tez.runtime.api.impl.EventType;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+@SuppressWarnings("unchecked")
+public class TestTaskCommunicatorManager1 {
+  private ApplicationId appId;
+  private ApplicationAttemptId appAttemptId;
+  private AppContext appContext;
+  Credentials credentials;
+  AMContainerMap amContainerMap;
+  EventHandler eventHandler;
+  DAG dag;
+  TaskCommunicatorManager taskAttemptListener;
+  ContainerTask containerTask;
+  AMContainerTask amContainerTask;
+  TaskSpec taskSpec;
+
+  TezVertexID vertexID;
+  TezTaskID taskID;
+  TezTaskAttemptID taskAttemptID;
+
+  @Before
+  public void setUp() {
+    appId = ApplicationId.newInstance(1000, 1);
+    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    dag = mock(DAG.class);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    vertexID = TezVertexID.getInstance(dagID, 1);
+    taskID = TezTaskID.getInstance(vertexID, 1);
+    taskAttemptID = TezTaskAttemptID.getInstance(taskID, 1);
+    credentials = new Credentials();
+
+    amContainerMap = mock(AMContainerMap.class);
+    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+
+    eventHandler = mock(EventHandler.class);
+
+    MockClock clock = new MockClock();
+    
+    appContext = mock(AppContext.class);
+    doReturn(eventHandler).when(appContext).getEventHandler();
+    doReturn(dag).when(appContext).getCurrentDAG();
+    doReturn(appAcls).when(appContext).getApplicationACLs();
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    doReturn(clock).when(appContext).getClock();
+    
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+    doReturn(credentials).when(appContext).getAppCredentials();
+    NodeId nodeId = NodeId.newInstance("localhost", 0);
+    AMContainer amContainer = mock(AMContainer.class);
+    Container container = mock(Container.class);
+    doReturn(nodeId).when(container).getNodeId();
+    doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
+    doReturn(container).when(amContainer).getContainer();
+
+    Configuration conf = new TezConfiguration();
+    UserPayload defaultPayload;
+    try {
+      defaultPayload = TezUtils.createUserPayloadFromConf(conf);
+    } catch (IOException e) {
+      throw new TezUncheckedException(e);
+    }
+    taskAttemptListener = new TaskCommunicatorManagerInterfaceImplForTest(appContext,
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class),
+        Lists.newArrayList(
+            new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+                .setUserPayload(defaultPayload)));
+
+    taskSpec = mock(TaskSpec.class);
+    doReturn(taskAttemptID).when(taskSpec).getTaskAttemptID();
+    amContainerTask = new AMContainerTask(taskSpec, null, null, false, 0);
+    containerTask = null;
+  }
+
+  @Test(timeout = 5000)
+  public void testGetTask() throws IOException {
+
+    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
+    TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
+
+    ContainerId containerId1 = createContainerId(appId, 1);
+    ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
+    containerTask = tezUmbilical.getTask(containerContext1);
+    assertTrue(containerTask.shouldDie());
+
+    ContainerId containerId2 = createContainerId(appId, 2);
+    ContainerContext containerContext2 = new ContainerContext(containerId2.toString());
+    taskAttemptListener.registerRunningContainer(containerId2, 0);
+    containerTask = tezUmbilical.getTask(containerContext2);
+    assertNull(containerTask);
+
+    // Valid task registered
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId2, 0);
+    containerTask = tezUmbilical.getTask(containerContext2);
+    assertFalse(containerTask.shouldDie());
+    assertEquals(taskSpec, containerTask.getTaskSpec());
+
+    // Task unregistered. Should respond to heartbeats
+    taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER, null);
+    containerTask = tezUmbilical.getTask(containerContext2);
+    assertNull(containerTask);
+
+    // Container unregistered. Should send a shouldDie = true
+    taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER, null);
+    containerTask = tezUmbilical.getTask(containerContext2);
+    assertTrue(containerTask.shouldDie());
+
+    ContainerId containerId3 = createContainerId(appId, 3);
+    ContainerContext containerContext3 = new ContainerContext(containerId3.toString());
+    taskAttemptListener.registerRunningContainer(containerId3, 0);
+
+    // Register task to container3, followed by unregistering container 3 all together
+    TaskSpec taskSpec2 = mock(TaskSpec.class);
+    TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
+    doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
+    AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
+    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
+    taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER, null);
+    containerTask = tezUmbilical.getTask(containerContext3);
+    assertTrue(containerTask.shouldDie());
+  }
+
+  @Test(timeout = 5000)
+  public void testGetTaskMultiplePulls() throws IOException {
+    TezTaskCommunicatorImpl taskCommunicator = (TezTaskCommunicatorImpl)taskAttemptListener.getTaskCommunicator(0);
+    TezTaskUmbilicalProtocol tezUmbilical = taskCommunicator.getUmbilical();
+
+    ContainerId containerId1 = createContainerId(appId, 1);
+
+    ContainerContext containerContext1 = new ContainerContext(containerId1.toString());
+    taskAttemptListener.registerRunningContainer(containerId1, 0);
+    containerTask = tezUmbilical.getTask(containerContext1);
+    assertNull(containerTask);
+
+    // Register task
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId1, 0);
+    containerTask = tezUmbilical.getTask(containerContext1);
+    assertFalse(containerTask.shouldDie());
+    assertEquals(taskSpec, containerTask.getTaskSpec());
+
+    // Try pulling again - simulates re-use pull
+    containerTask = tezUmbilical.getTask(containerContext1);
+    assertNull(containerTask);
+  }
+
+  @Test (timeout = 5000)
+  public void testTaskEventRouting() throws Exception {
+    List<TezEvent> events =  Arrays.asList(
+      new TezEvent(new TaskStatusUpdateEvent(null, 0.0f, null), null),
+      new TezEvent(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])), null),
+      new TezEvent(new TaskAttemptCompletedEvent(), null)
+    );
+
+    generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(arg.capture());
+    final List<Event> argAllValues = arg.getAllValues();
+
+    final Event statusUpdateEvent = argAllValues.get(0);
+    assertEquals("First event should be status update", TaskAttemptEventType.TA_STATUS_UPDATE,
+        statusUpdateEvent.getType());
+
+    final Event vertexEvent = argAllValues.get(1);
+    final VertexEventRouteEvent vertexRouteEvent = (VertexEventRouteEvent)vertexEvent;
+    assertEquals("First event should be routed to vertex", VertexEventType.V_ROUTE_EVENT,
+        vertexEvent.getType());
+    assertEquals(EventType.DATA_MOVEMENT_EVENT,
+        vertexRouteEvent.getEvents().get(0).getEventType());
+    assertEquals(EventType.TASK_ATTEMPT_COMPLETED_EVENT,
+        vertexRouteEvent.getEvents().get(1).getEventType());
+
+  }
+
+  @Test (timeout = 5000)
+  public void testTaskEventRoutingTaskAttemptOnly() throws Exception {
+    List<TezEvent> events = Arrays.asList(
+      new TezEvent(new TaskAttemptCompletedEvent(), null)
+    );
+    generateHeartbeat(events, 0, 1, 0, new ArrayList<TezEvent>());
+
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(1)).handle(arg.capture());
+    final List<Event> argAllValues = arg.getAllValues();
+
+    final Event event = argAllValues.get(0);
+    assertEquals("only event should be route event", VertexEventType.V_ROUTE_EVENT,
+        event.getType());
+  }
+  
+  @Test (timeout = 5000)
+  public void testTaskHeartbeatResponse() throws Exception {
+    List<TezEvent> events = new ArrayList<TezEvent>();
+    List<TezEvent> eventsToSend = new ArrayList<TezEvent>();
+    TaskHeartbeatResponse response = generateHeartbeat(events, 0, 1, 2, eventsToSend);
+    
+    assertEquals(2, response.getNextFromEventId());
+    assertEquals(eventsToSend, response.getEvents());
+  }
+
+  //try 10 times to allocate random port, fail it if no one is succeed.
+  @Test (timeout = 5000)
+  public void testPortRange() {
+    boolean succeedToAllocate = false;
+    Random rand = new Random();
+    for (int i = 0; i < 10; ++i) {
+      int nextPort = 1024 + rand.nextInt(65535 - 1024);
+      if (testPortRange(nextPort)) {
+        succeedToAllocate = true;
+        break;
+      }
+    }
+    if (!succeedToAllocate) {
+      fail("Can not allocate free port even in 10 iterations for TaskAttemptListener");
+    }
+  }
+
+  // TODO TEZ-2003 Move this into TestTezTaskCommunicator. Potentially other tests as well.
+  @Test (timeout= 5000)
+  public void testPortRange_NotSpecified() throws IOException {
+    Configuration conf = new Configuration();
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+        "fakeIdentifier"));
+    Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+        new JobTokenSecretManager());
+    sessionToken.setService(identifier.getJobId());
+    TokenCache.setSessionToken(sessionToken, credentials);
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+    taskAttemptListener = new TaskCommunicatorManager(appContext,
+        mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists.newArrayList(
+        new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+            .setUserPayload(userPayload)));
+    // no exception happen, should started properly
+    taskAttemptListener.init(conf);
+    taskAttemptListener.start();
+  }
+
+  private boolean testPortRange(int port) {
+    boolean succeedToAllocate = true;
+    try {
+      Configuration conf = new Configuration();
+      
+      JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(
+          "fakeIdentifier"));
+      Token<JobTokenIdentifier> sessionToken = new Token<JobTokenIdentifier>(identifier,
+          new JobTokenSecretManager());
+      sessionToken.setService(identifier.getJobId());
+      TokenCache.setSessionToken(sessionToken, credentials);
+
+      conf.set(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE, port + "-" + port);
+      UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+
+      taskAttemptListener = new TaskCommunicatorManager(appContext,
+          mock(TaskHeartbeatHandler.class), mock(ContainerHeartbeatHandler.class), Lists
+          .newArrayList(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+              .setUserPayload(userPayload)));
+      taskAttemptListener.init(conf);
+      taskAttemptListener.start();
+      int resultedPort = taskAttemptListener.getTaskCommunicator(0).getAddress().getPort();
+      assertEquals(port, resultedPort);
+    } catch (Exception e) {
+      succeedToAllocate = false;
+    } finally {
+      if (taskAttemptListener != null) {
+        try {
+          taskAttemptListener.close();
+        } catch (IOException e) {
+          e.printStackTrace();
+          fail("fail to stop TaskAttemptListener");
+        }
+      }
+    }
+    return succeedToAllocate;
+  }
+
+  private TaskHeartbeatResponse generateHeartbeat(List<TezEvent> events,
+      int fromEventId, int maxEvents, int nextFromEventId,
+      List<TezEvent> sendEvents) throws IOException, TezException {
+    ContainerId containerId = createContainerId(appId, 1);
+    Vertex vertex = mock(Vertex.class);
+
+    doReturn(vertex).when(dag).getVertex(vertexID);
+    doReturn("test_vertex").when(vertex).getName();
+    TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(nextFromEventId, sendEvents, 0);
+    doReturn(eventInfo).when(vertex).getTaskAttemptTezEvents(taskAttemptID, fromEventId, 0, maxEvents);
+
+    taskAttemptListener.registerRunningContainer(containerId, 0);
+    taskAttemptListener.registerTaskAttempt(amContainerTask, containerId, 0);
+
+    TaskHeartbeatRequest request = mock(TaskHeartbeatRequest.class);
+    doReturn(containerId.toString()).when(request).getContainerIdentifier();
+    doReturn(containerId.toString()).when(request).getContainerIdentifier();
+    doReturn(taskAttemptID).when(request).getTaskAttemptId();
+    doReturn(events).when(request).getEvents();
+    doReturn(maxEvents).when(request).getMaxEvents();
+    doReturn(fromEventId).when(request).getStartIndex();
+
+    return taskAttemptListener.heartbeat(request);
+  }
+
+
+  private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    return ContainerId.newInstance(appAttemptId, containerIdx);
+  }
+
+  private static class TaskCommunicatorManagerInterfaceImplForTest extends TaskCommunicatorManager {
+
+    public TaskCommunicatorManagerInterfaceImplForTest(AppContext context,
+                                                       TaskHeartbeatHandler thh,
+                                                       ContainerHeartbeatHandler chh,
+                                                       List<NamedEntityDescriptor> taskCommDescriptors) {
+      super(context, thh, chh, taskCommDescriptors);
+    }
+
+    @Override
+    TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
+      return new TezTaskCommunicatorImplForTest(taskCommunicatorContext);
+    }
+
+  }
+
+  private static class TezTaskCommunicatorImplForTest extends TezTaskCommunicatorImpl {
+
+    public TezTaskCommunicatorImplForTest(
+        TaskCommunicatorContext taskCommunicatorContext) {
+      super(taskCommunicatorContext);
+    }
+
+    @Override
+    protected void startRpcServer() {
+    }
+
+    @Override
+    protected void stopRpcServer() {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
new file mode 100644
index 0000000..d75b0e5
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskCommunicatorManager2.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
+import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
+import org.apache.tez.dag.app.rm.container.AMContainer;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.AMContainerTask;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestTaskCommunicatorManager2 {
+
+  @Test(timeout = 5000)
+  public void testTaskAttemptFailedKilled() throws IOException {
+    ApplicationId appId = ApplicationId.newInstance(1000, 1);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+    Credentials credentials = new Credentials();
+    AppContext appContext = mock(AppContext.class);
+    EventHandler eventHandler = mock(EventHandler.class);
+    DAG dag = mock(DAG.class);
+    AMContainerMap amContainerMap = mock(AMContainerMap.class);
+    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+    doReturn(eventHandler).when(appContext).getEventHandler();
+    doReturn(dag).when(appContext).getCurrentDAG();
+    doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
+    doReturn(credentials).when(appContext).getAppCredentials();
+    doReturn(appAcls).when(appContext).getApplicationACLs();
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    NodeId nodeId = NodeId.newInstance("localhost", 0);
+    AMContainer amContainer = mock(AMContainer.class);
+    Container container = mock(Container.class);
+    doReturn(nodeId).when(container).getNodeId();
+    doReturn(amContainer).when(amContainerMap).get(any(ContainerId.class));
+    doReturn(container).when(amContainer).getContainer();
+
+    Configuration conf = new TezConfiguration();
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+    TaskCommunicatorManager taskAttemptListener =
+        new TaskCommunicatorManager(appContext, mock(TaskHeartbeatHandler.class),
+            mock(ContainerHeartbeatHandler.class), Lists.newArrayList(new NamedEntityDescriptor(
+            TezConstants.getTezYarnServicePluginName(), null).setUserPayload(userPayload)));
+
+    TaskSpec taskSpec1 = mock(TaskSpec.class);
+    TezTaskAttemptID taskAttemptId1 = mock(TezTaskAttemptID.class);
+    doReturn(taskAttemptId1).when(taskSpec1).getTaskAttemptID();
+    AMContainerTask amContainerTask1 = new AMContainerTask(taskSpec1, null, null, false, 10);
+
+    TaskSpec taskSpec2 = mock(TaskSpec.class);
+    TezTaskAttemptID taskAttemptId2 = mock(TezTaskAttemptID.class);
+    doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
+    AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec2, null, null, false, 10);
+
+    ContainerId containerId1 = createContainerId(appId, 1);
+    taskAttemptListener.registerRunningContainer(containerId1, 0);
+    taskAttemptListener.registerTaskAttempt(amContainerTask1, containerId1, 0);
+    ContainerId containerId2 = createContainerId(appId, 2);
+    taskAttemptListener.registerRunningContainer(containerId2, 0);
+    taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId2, 0);
+
+
+    taskAttemptListener
+        .taskFailed(taskAttemptId1, TaskAttemptEndReason.COMMUNICATION_ERROR, "Diagnostics1");
+    taskAttemptListener
+        .taskKilled(taskAttemptId2, TaskAttemptEndReason.EXECUTOR_BUSY, "Diagnostics2");
+
+    ArgumentCaptor<Event> argumentCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(2)).handle(argumentCaptor.capture());
+    assertTrue(argumentCaptor.getAllValues().get(0) instanceof TaskAttemptEventAttemptFailed);
+    assertTrue(argumentCaptor.getAllValues().get(1) instanceof TaskAttemptEventAttemptKilled);
+    TaskAttemptEventAttemptFailed failedEvent =
+        (TaskAttemptEventAttemptFailed) argumentCaptor.getAllValues().get(0);
+    TaskAttemptEventAttemptKilled killedEvent =
+        (TaskAttemptEventAttemptKilled) argumentCaptor.getAllValues().get(1);
+
+    assertEquals("Diagnostics1", failedEvent.getDiagnosticInfo());
+    assertEquals(TaskAttemptTerminationCause.COMMUNICATION_ERROR,
+        failedEvent.getTerminationCause());
+
+    assertEquals("Diagnostics2", killedEvent.getDiagnosticInfo());
+    assertEquals(TaskAttemptTerminationCause.SERVICE_BUSY, killedEvent.getTerminationCause());
+    // TODO TEZ-2003. Verify unregistration from the registered list
+  }
+
+  private ContainerId createContainerId(ApplicationId applicationId, int containerIdx) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, 1);
+    ContainerId containerId = ContainerId.newInstance(appAttemptId, containerIdx);
+    return containerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
index 83421a2..f0b89c8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestCommit.java
@@ -70,7 +70,7 @@ import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.DAGTerminationCause;
@@ -142,7 +142,7 @@ public class TestCommit {
   private TaskEventDispatcher taskEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
   private DagEventDispatcher dagEventDispatcher;
-  private TaskAttemptListener taskAttemptListener;
+  private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
   private TaskHeartbeatHandler thh;
   private Clock clock = new SystemClock();
   private DAGFinishEventHandler dagFinishEventHandler;
@@ -317,7 +317,7 @@ public class TestCommit {
     doReturn(historyEventHandler).when(appContext).getHistoryHandler();
     doReturn(aclManager).when(appContext).getAMACLManager();
     dag = new DAGImpl(dagId, conf, dagPlan, dispatcher.getEventHandler(),
-        taskAttemptListener, fsTokens, clock, "user", thh, appContext);
+        taskCommunicatorManagerInterface, fsTokens, clock, "user", thh, appContext);
     doReturn(dag).when(appContext).getCurrentDAG();
     doReturn(dispatcher.getEventHandler()).when(appContext).getEventHandler();
     ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index e268a99..ac4f61b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -88,7 +88,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.DAGState;
@@ -165,7 +165,7 @@ public class TestDAGImpl {
   private TaskEventDispatcher taskEventDispatcher;
   private VertexEventDispatcher vertexEventDispatcher;
   private DagEventDispatcher dagEventDispatcher;
-  private TaskAttemptListener taskAttemptListener;
+  private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
   private TaskHeartbeatHandler thh;
   private Clock clock = new SystemClock();
   private DAGFinishEventHandler dagFinishEventHandler;
@@ -784,7 +784,7 @@ public class TestDAGImpl {
     doReturn(historyEventHandler).when(appContext).getHistoryHandler();
     doReturn(aclManager).when(appContext).getAMACLManager();
     dag = new DAGImpl(dagId, conf, dagPlan,
-        dispatcher.getEventHandler(),  taskAttemptListener,
+        dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
         fsTokens, clock, "user", thh, appContext);
     dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
     doReturn(dag).when(appContext).getCurrentDAG();
@@ -795,7 +795,7 @@ public class TestDAGImpl {
     mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2);
     mrrDagPlan = createTestMRRDAGPlan();
     mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
-        dispatcher.getEventHandler(),  taskAttemptListener,
+        dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
         fsTokens, clock, "user", thh,
         mrrAppContext);
     mrrDag.entityUpdateTracker = new StateChangeNotifierForTest(mrrDag);
@@ -811,7 +811,7 @@ public class TestDAGImpl {
     groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3);
     groupDagPlan = createGroupDAGPlan();
     groupDag = new DAGImpl(groupDagId, conf, groupDagPlan,
-        dispatcher.getEventHandler(),  taskAttemptListener,
+        dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
         fsTokens, clock, "user", thh,
         groupAppContext);
     groupDag.entityUpdateTracker = new StateChangeNotifierForTest(groupDag);
@@ -881,7 +881,7 @@ public class TestDAGImpl {
     dagWithCustomEdgeAppContext = mock(AppContext.class);
     doReturn(aclManager).when(dagWithCustomEdgeAppContext).getAMACLManager();
     dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge,
-        dispatcher.getEventHandler(),  taskAttemptListener,
+        dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
         fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext);
     dagWithCustomEdge.entityUpdateTracker = new StateChangeNotifierForTest(dagWithCustomEdge);
     doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf();

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index 792fa63..409c506 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -37,7 +37,7 @@ import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.DAGTerminationCause;
@@ -91,7 +91,7 @@ public class TestDAGRecovery {
     DAGPlan dagPlan = TestDAGImpl.createTestDAGPlan();
     dag =
         new DAGImpl(dagId, new Configuration(), dagPlan, mockEventHandler,
-            mock(TaskAttemptListener.class), new Credentials(),
+            mock(TaskCommunicatorManagerInterface.class), new Credentials(),
             new SystemClock(), user, mock(TaskHeartbeatHandler.class),
             mockAppContext);
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 04bb2df..13c9202 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -64,7 +64,7 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -145,7 +145,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(
         TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
+        mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
         false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
 
@@ -180,12 +180,12 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(
         TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
+        mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
         false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
 
     TaskAttemptImpl taImplReScheduled = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
+        mock(TaskCommunicatorManagerInterface.class), new Configuration(), new SystemClock(),
         mock(TaskHeartbeatHandler.class), mock(AppContext.class),
         true, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
 
@@ -243,7 +243,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(
         TezVertexID.getInstance(TezDAGID.getInstance("1", 1, 1), 1), 1);
     TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
-        mock(TaskAttemptListener.class), new Configuration(),
+        mock(TaskCommunicatorManagerInterface.class), new Configuration(),
         new SystemClock(), mock(TaskHeartbeatHandler.class),
         mock(AppContext.class), false, Resource.newInstance(1024,
             1), createFakeContainerContext(), false);
@@ -285,7 +285,7 @@ public class TestTaskAttempt {
     TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
 
     MockEventHandler eventHandler = new MockEventHandler();
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -334,7 +334,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -354,7 +354,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -434,7 +434,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = new MockEventHandler();
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -454,7 +454,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -498,7 +498,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -518,7 +518,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -589,7 +589,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -610,7 +610,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -719,7 +719,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -740,7 +740,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -810,7 +810,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -830,7 +830,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -904,7 +904,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -924,7 +924,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -1006,7 +1006,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1026,7 +1026,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -1105,7 +1105,7 @@ public class TestTaskAttempt {
 
     MockEventHandler mockEh = new MockEventHandler();
     MockEventHandler eventHandler = spy(mockEh);
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1125,7 +1125,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -1249,7 +1249,7 @@ public class TestTaskAttempt {
     TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
 
     MockEventHandler eventHandler = spy(new MockEventHandler());
-    TaskAttemptListener taListener = createMockTaskAttemptListener();
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
 
     Configuration taskConf = new Configuration();
     taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
@@ -1268,7 +1268,7 @@ public class TestTaskAttempt {
 
     AppContext appCtx = mock(AppContext.class);
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appCtx);
     containers.addContainerIfNew(container, 0, 0, 0);
 
@@ -1324,7 +1324,7 @@ public class TestTaskAttempt {
     public int taskAttemptStartedEventLogged = 0;
     public int taskAttemptFinishedEventLogged = 0;
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
-        EventHandler eventHandler, TaskAttemptListener tal,
+        EventHandler eventHandler, TaskCommunicatorManagerInterface tal,
         Configuration conf, Clock clock,
         TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
         boolean isRescheduled,
@@ -1378,8 +1378,8 @@ public class TestTaskAttempt {
         new Credentials(), new HashMap<String, String>(), "");
   }
 
-  private TaskAttemptListener createMockTaskAttemptListener() {
-    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+  private TaskCommunicatorManagerInterface createMockTaskAttemptListener() {
+    TaskCommunicatorManagerInterface taListener = mock(TaskCommunicatorManagerInterface.class);
     TaskCommunicator taskComm = mock(TaskCommunicator.class);
     doReturn(new InetSocketAddress("localhost", 0)).when(taskComm).getAddress();
     doReturn(taskComm).when(taListener).getTaskCommunicator(0);

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 4a797e0..6bbfc3d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -42,7 +42,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -146,7 +146,7 @@ public class TestTaskAttemptRecovery {
         TezTaskID.fromString("task_1407371892933_0001_1_00_000000");
     ta =
         new TaskAttemptImpl(taskId, 0, mockEventHandler,
-            mock(TaskAttemptListener.class), new Configuration(),
+            mock(TaskCommunicatorManagerInterface.class), new Configuration(),
             new SystemClock(), mock(TaskHeartbeatHandler.class),
             mockAppContext, false, Resource.newInstance(1, 1),
             mock(ContainerContext.class), false, mockTask);

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 807f277..24c9664 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -50,7 +50,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
@@ -85,7 +85,7 @@ public class TestTaskImpl {
   private final int partition = 1;
 
   private Configuration conf;
-  private TaskAttemptListener taskAttemptListener;
+  private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
   private TaskHeartbeatHandler taskHeartbeatHandler;
   private Credentials credentials;
   private Clock clock;
@@ -122,7 +122,7 @@ public class TestTaskImpl {
   @Before
   public void setup() {
     conf = new Configuration();
-    taskAttemptListener = mock(TaskAttemptListener.class);
+    taskCommunicatorManagerInterface = mock(TaskCommunicatorManagerInterface.class);
     taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
     credentials = new Credentials();
     clock = new SystemClock();
@@ -151,7 +151,7 @@ public class TestTaskImpl {
     eventHandler = new TestEventHandler();
     
     mockTask = new MockTaskImpl(vertexId, partition,
-        eventHandler, conf, taskAttemptListener, clock,
+        eventHandler, conf, taskCommunicatorManagerInterface, clock,
         taskHeartbeatHandler, appContext, leafVertex,
         taskResource, containerContext, vertex);
     mockTaskSpec = mock(TaskSpec.class);
@@ -698,11 +698,11 @@ public class TestTaskImpl {
 
     public MockTaskImpl(TezVertexID vertexId, int partition,
         EventHandler eventHandler, Configuration conf,
-        TaskAttemptListener taskAttemptListener, Clock clock,
+        TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock,
         TaskHeartbeatHandler thh, AppContext appContext, boolean leafVertex,
         Resource resource,
         ContainerContext containerContext, Vertex vertex) {
-      super(vertexId, partition, eventHandler, conf, taskAttemptListener,
+      super(vertexId, partition, eventHandler, conf, taskCommunicatorManagerInterface,
           clock, thh, appContext, leafVertex, resource,
           containerContext, mock(StateChangeNotifier.class), vertex);
       this.vertex = vertex;
@@ -711,7 +711,7 @@ public class TestTaskImpl {
     @Override
     protected TaskAttemptImpl createAttempt(int attemptNumber, TezTaskAttemptID schedCausalTA) {
       MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
-          attemptNumber, eventHandler, taskAttemptListener,
+          attemptNumber, eventHandler, taskCommunicatorManagerInterface,
           conf, clock, taskHeartbeatHandler, appContext,
           true, taskResource, containerContext, schedCausalTA);
       taskAttempts.add(attempt);
@@ -757,7 +757,7 @@ public class TestTaskImpl {
     private TaskAttemptState state = TaskAttemptState.NEW;
 
     public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
-        EventHandler eventHandler, TaskAttemptListener tal, Configuration conf,
+        EventHandler eventHandler, TaskCommunicatorManagerInterface tal, Configuration conf,
         Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
         boolean isRescheduled,
         Resource resource, ContainerContext containerContext, TezTaskAttemptID schedCausalTA) {

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index 1d22e06..eca8274 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -46,7 +46,7 @@ import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
@@ -189,7 +189,7 @@ public class TestTaskRecovery {
     when(mockAppContext.getHistoryHandler()).thenReturn(mockHistoryEventHandler);
     task =
         new TaskImpl(vertexId, 0, dispatcher.getEventHandler(),
-            new Configuration(), mock(TaskAttemptListener.class),
+            new Configuration(), mock(TaskCommunicatorManagerInterface.class),
             new SystemClock(), mock(TaskHeartbeatHandler.class),
             mockAppContext, false, Resource.newInstance(1, 1),
             mock(ContainerContext.class), mock(StateChangeNotifier.class), vertex);