You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:49 UTC

[2/7] tez git commit: TEZ-2708. Rename classes and variables post TEZ-2003 changes. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 0a02f9e..0e90681 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -68,7 +68,7 @@ import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
@@ -76,7 +76,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.CapturingEventHandler;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerManagerForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
 import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
 import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
@@ -136,7 +136,7 @@ public class TestContainerReuse {
     doReturn(conf).when(appContext).getAMConf();
     AMContainerMap amContainerMap = new AMContainerMap(
       mock(ContainerHeartbeatHandler.class),
-      mock(TaskAttemptListener.class),
+      mock(TaskCommunicatorManagerInterface.class),
       new ContainerContextMatcher(), appContext);
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
@@ -145,18 +145,18 @@ public class TestContainerReuse {
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
-        new TaskSchedulerEventHandlerForTest(
+    TaskSchedulerManager taskSchedulerManagerReal =
+        new TaskSchedulerManagerForTest(
           appContext, eventHandler, rmClient,
           new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf));
-    TaskSchedulerEventHandler taskSchedulerEventHandler =
-        spy(taskSchedulerEventHandlerReal);
-    taskSchedulerEventHandler.init(conf);
-    taskSchedulerEventHandler.start();
+    TaskSchedulerManager taskSchedulerManager =
+        spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(conf);
+    taskSchedulerManager.start();
 
     TaskSchedulerWithDrainableContext taskScheduler =
       (TaskSchedulerWithDrainableContext)
-        ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+        ((TaskSchedulerManagerForTest) taskSchedulerManager)
         .getSpyTaskScheduler();
     TaskSchedulerContextDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
@@ -192,10 +192,10 @@ public class TestContainerReuse {
         defaultRack, priority);
 
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrTa11);
+    taskSchedulerManager.handleEvent(lrTa11);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrTa21);
+    taskSchedulerManager.handleEvent(lrTa21);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container containerHost1 = createContainer(1, host1[0], resource, priority);
@@ -206,28 +206,28 @@ public class TestContainerReuse {
       Lists.newArrayList(containerHost1, containerHost2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(
+    verify(taskSchedulerManager).taskAllocated(
       eq(0), eq(ta11), any(Object.class), eq(containerHost1));
-    verify(taskSchedulerEventHandler).taskAllocated(
+    verify(taskSchedulerManager).taskAllocated(
       eq(0), eq(ta21), any(Object.class), eq(containerHost2));
 
     // Adding the event later so that task1 assigned to containerHost1
     // is deterministic.
-    taskSchedulerEventHandler.handleEvent(lrTa31);
+    taskSchedulerManager.handleEvent(lrTa31);
 
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(
             ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
-    verify(taskSchedulerEventHandler, times(1)).taskAllocated(
+    verify(taskSchedulerManager, times(1)).taskAllocated(
       eq(0), eq(ta31), any(Object.class), eq(containerHost1));
     verify(rmClient, times(0)).releaseAssignedContainer(
       eq(containerHost1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
 
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
       new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
         TaskAttemptState.SUCCEEDED, null, null, 0));
 
@@ -235,7 +235,7 @@ public class TestContainerReuse {
     Throwable exception = null;
     while (System.currentTimeMillis() < currentTs + 5000l) {
       try {
-        verify(taskSchedulerEventHandler,
+        verify(taskSchedulerManager,
           times(1)).containerBeingReleased(eq(0), eq(containerHost2.getId()));
         exception = null;
         break;
@@ -245,7 +245,7 @@ public class TestContainerReuse {
     }
     assertTrue("containerHost2 was not released", exception == null);
     taskScheduler.shutdown();
-    taskSchedulerEventHandler.close();
+    taskSchedulerManager.close();
   }
 
   @Test(timeout = 15000l)
@@ -272,7 +272,7 @@ public class TestContainerReuse {
     doReturn(new Configuration(false)).when(appContext).getAMConf();
     AMContainerMap amContainerMap = new AMContainerMap(
       mock(ContainerHeartbeatHandler.class),
-      mock(TaskAttemptListener.class),
+      mock(TaskCommunicatorManagerInterface.class),
       new ContainerContextMatcher(), appContext);
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
@@ -281,17 +281,17 @@ public class TestContainerReuse {
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
-      new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+    TaskSchedulerManager taskSchedulerManagerReal =
+      new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
         new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf));
-    TaskSchedulerEventHandler taskSchedulerEventHandler =
-      spy(taskSchedulerEventHandlerReal);
-    taskSchedulerEventHandler.init(conf);
-    taskSchedulerEventHandler.start();
+    TaskSchedulerManager taskSchedulerManager =
+      spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(conf);
+    taskSchedulerManager.start();
 
     TaskSchedulerWithDrainableContext taskScheduler =
       (TaskSchedulerWithDrainableContext)
-        ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+        ((TaskSchedulerManagerForTest) taskSchedulerManager)
           .getSpyTaskScheduler();
     TaskSchedulerContextDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
@@ -321,10 +321,10 @@ public class TestContainerReuse {
       taID31, ta31, resource, host1, defaultRack, priority);
 
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrTa11);
+    taskSchedulerManager.handleEvent(lrTa11);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrTa21);
+    taskSchedulerManager.handleEvent(lrTa21);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container containerHost1 = createContainer(1, host1[0], resource, priority);
@@ -334,26 +334,26 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class),
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta21), any(Object.class),
         eq(containerHost2));
 
     // Adding the event later so that task1 assigned to containerHost1 is deterministic.
-    taskSchedulerEventHandler.handleEvent(lrTa31);
+    taskSchedulerManager.handleEvent(lrTa31);
 
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
             TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta21, true, null, null);
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(
+    verify(taskSchedulerManager, times(0)).taskAllocated(
         eq(0), eq(ta31), any(Object.class), eq(containerHost2));
     verify(rmClient, times(1)).releaseAssignedContainer(
       eq(containerHost2.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
 
     taskScheduler.shutdown();
-    taskSchedulerEventHandler.close();
+    taskSchedulerManager.close();
   }
 
   @Test(timeout = 10000l)
@@ -375,7 +375,7 @@ public class TestContainerReuse {
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
-        mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+        mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -383,12 +383,13 @@ public class TestContainerReuse {
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
-    TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
-    taskSchedulerEventHandler.init(tezConf);
-    taskSchedulerEventHandler.start();
+    TaskSchedulerManager
+        taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
+    TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
 
-    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager)
         .getSpyTaskScheduler();
     TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -423,10 +424,10 @@ public class TestContainerReuse {
     TaskAttempt ta14 = mock(TaskAttempt.class);
     AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, priority1);
 
-    taskSchedulerEventHandler.handleEvent(lrEvent1);
-    taskSchedulerEventHandler.handleEvent(lrEvent2);
-    taskSchedulerEventHandler.handleEvent(lrEvent3);
-    taskSchedulerEventHandler.handleEvent(lrEvent4);
+    taskSchedulerManager.handleEvent(lrEvent1);
+    taskSchedulerManager.handleEvent(lrEvent2);
+    taskSchedulerManager.handleEvent(lrEvent3);
+    taskSchedulerManager.handleEvent(lrEvent4);
 
     Container container1 = createContainer(1, "host1", resource1, priority1);
 
@@ -435,39 +436,39 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class),
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class),
         eq(container1));
 
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta12, true, null, null);
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class),
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class),
         eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     eventHandler.reset();
 
     // Verify no re-use if a previous task fails.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null,
             "TIMEOUT", 0));
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
+    verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
         eq(container1));
     verifyDeAllocateTask(taskScheduler, ta13, false, null, "TIMEOUT");
     verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
@@ -481,11 +482,11 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class),
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class),
         eq(container2));
 
     // Task assigned to container completed successfully. No pending requests. Container should be released.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
@@ -495,7 +496,7 @@ public class TestContainerReuse {
     eventHandler.reset();
 
     taskScheduler.shutdown();
-    taskSchedulerEventHandler.close();
+    taskSchedulerManager.close();
   }
 
   @Test(timeout = 10000l)
@@ -521,7 +522,7 @@ public class TestContainerReuse {
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
-        mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+        mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -530,14 +531,14 @@ public class TestContainerReuse {
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
     //Use ContainerContextMatcher here.  Otherwise it would not match the JVM options
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
-        new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
-    TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
-    taskSchedulerEventHandler.init(tezConf);
-    taskSchedulerEventHandler.start();
+    TaskSchedulerManager taskSchedulerManagerReal =
+        new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
+    TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
 
     TaskSchedulerWithDrainableContext taskScheduler =
-        (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+        (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager)
           .getSpyTaskScheduler();
     TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -573,10 +574,10 @@ public class TestContainerReuse {
         createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1);
 
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent1);
+    taskSchedulerManager.handleEvent(lrEvent1);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent2);
+    taskSchedulerManager.handleEvent(lrEvent2);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, "host1", resource1, priority1);
@@ -586,16 +587,16 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class),
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class),
         eq(container1));
 
     // First task had profiling on. This container can not be reused further.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class),
+    verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class),
         eq(container1));
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -623,10 +624,10 @@ public class TestContainerReuse {
 
     Container container2 = createContainer(2, "host2", resource1, priority1);
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent3);
+    taskSchedulerManager.handleEvent(lrEvent3);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent4);
+    taskSchedulerManager.handleEvent(lrEvent4);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     // Container started
@@ -634,16 +635,16 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container2));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2));
 
     // Verify that the container can not be reused when profiling option is turned on
     // Even for 2 tasks having same profiling option can have container reusability.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta13, true, null, null);
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
+    verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
         eq(container2));
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId()));
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -675,27 +676,27 @@ public class TestContainerReuse {
 
     // Container started
     Container container3 = createContainer(2, "host3", resource1, priority1);
-    taskSchedulerEventHandler.handleEvent(lrEvent5);
-    taskSchedulerEventHandler.handleEvent(lrEvent6);
+    taskSchedulerManager.handleEvent(lrEvent5);
+    taskSchedulerManager.handleEvent(lrEvent6);
 
     drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container3));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class),
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta15), any(Object.class),
         eq(container3));
 
     //Ensure task 6 (of vertex 1) is allocated to same container
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta15, true, null, null);
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
     eventHandler.reset();
 
     taskScheduler.shutdown();
-    taskSchedulerEventHandler.close();
+    taskSchedulerManager.close();
   }
 
   @Test(timeout = 30000l)
@@ -721,7 +722,7 @@ public class TestContainerReuse {
     doReturn(new Configuration(false)).when(appContext).getAMConf();
     AMContainerMap amContainerMap = new AMContainerMap(
         mock(ContainerHeartbeatHandler.class),
-        mock(TaskAttemptListener.class),
+        mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appContext);
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
@@ -730,18 +731,18 @@ public class TestContainerReuse {
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
-        new TaskSchedulerEventHandlerForTest(
+    TaskSchedulerManager taskSchedulerManagerReal =
+        new TaskSchedulerManagerForTest(
           appContext, eventHandler, rmClient,
           new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
-    TaskSchedulerEventHandler taskSchedulerEventHandler =
-        spy(taskSchedulerEventHandlerReal);
-    taskSchedulerEventHandler.init(tezConf);
-    taskSchedulerEventHandler.start();
+    TaskSchedulerManager taskSchedulerManager =
+        spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
 
     TaskSchedulerWithDrainableContext taskScheduler =
       (TaskSchedulerWithDrainableContext)
-        ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+        ((TaskSchedulerManagerForTest) taskSchedulerManager)
         .getSpyTaskScheduler();
     TaskSchedulerContextDrainable drainableAppCallback =
       taskScheduler.getDrainableAppCallback();
@@ -772,7 +773,7 @@ public class TestContainerReuse {
 
     // Send launch request for task 1 only, deterministic assignment to this task.
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent11);
+    taskSchedulerManager.handleEvent(lrEvent11);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, "randomHost", resource1, priority);
@@ -782,21 +783,21 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(
+    verify(taskSchedulerManager).taskAllocated(
         eq(0), eq(ta11), any(Object.class), eq(container1));
 
     // Send launch request for task2 (vertex2)
-    taskSchedulerEventHandler.handleEvent(lrEvent12);
+    taskSchedulerManager.handleEvent(lrEvent12);
 
     // Task assigned to container completed successfully.
     // Container should not be immediately assigned to task 2
     // until delay expires.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta11, container1.getId(),
             TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(
+    verify(taskSchedulerManager, times(0)).taskAllocated(
         eq(0), eq(ta12), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -804,11 +805,11 @@ public class TestContainerReuse {
 
     LOG.info("Sleeping to ensure that the scheduling loop runs");
     Thread.sleep(3000l);
-    verify(taskSchedulerEventHandler).taskAllocated(
+    verify(taskSchedulerManager).taskAllocated(
         eq(0), eq(ta12), any(Object.class), eq(container1));
 
     // TA12 completed.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
       new AMSchedulerEventTAEnded(ta12, container1.getId(),
         TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
@@ -818,7 +819,7 @@ public class TestContainerReuse {
     eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
 
     taskScheduler.shutdown();
-    taskSchedulerEventHandler.close();
+    taskSchedulerManager.close();
   }
 
   @Test(timeout = 30000l)
@@ -845,7 +846,7 @@ public class TestContainerReuse {
     doReturn(new Configuration(false)).when(appContext).getAMConf();
     AMContainerMap amContainerMap = new AMContainerMap(
       mock(ContainerHeartbeatHandler.class),
-      mock(TaskAttemptListener.class),
+      mock(TaskCommunicatorManagerInterface.class),
       new ContainerContextMatcher(), appContext);
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
@@ -855,17 +856,17 @@ public class TestContainerReuse {
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
-      new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+    TaskSchedulerManager taskSchedulerManagerReal =
+      new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
         new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
-    TaskSchedulerEventHandler taskSchedulerEventHandler =
-      spy(taskSchedulerEventHandlerReal);
-    taskSchedulerEventHandler.init(tezConf);
-    taskSchedulerEventHandler.start();
+    TaskSchedulerManager taskSchedulerManager =
+      spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
 
     TaskSchedulerWithDrainableContext taskScheduler =
       (TaskSchedulerWithDrainableContext)
-        ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+        ((TaskSchedulerManagerForTest) taskSchedulerManager)
           .getSpyTaskScheduler();
     TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
 
@@ -900,7 +901,7 @@ public class TestContainerReuse {
 
     // Send launch request for task 1 onle, deterministic assignment to this task.
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent11);
+    taskSchedulerManager.handleEvent(lrEvent11);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, host1[0], resource1, priority1);
@@ -910,25 +911,25 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(
+    verify(taskSchedulerManager).taskAllocated(
         eq(0), eq(ta11), any(Object.class), eq(container1));
 
     // Send launch request for task2 (vertex2)
-    taskSchedulerEventHandler.handleEvent(lrEvent21);
+    taskSchedulerManager.handleEvent(lrEvent21);
 
     // Task assigned to container completed successfully.
     // Container should  be assigned to task21.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta11, container1.getId(),
             TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
-    verify(taskSchedulerEventHandler).taskAllocated(
+    verify(taskSchedulerManager).taskAllocated(
         eq(0), eq(ta21), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     // Task 2 completes.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta21, container1.getId(),
             TaskAttemptState.SUCCEEDED, null, null, 0));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -939,7 +940,7 @@ public class TestContainerReuse {
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
 
     taskScheduler.shutdown();
-    taskSchedulerEventHandler.close();
+    taskSchedulerManager.close();
   }
   
   @Test(timeout = 30000l)
@@ -962,7 +963,7 @@ public class TestContainerReuse {
     doReturn(new Configuration(false)).when(appContext).getAMConf();
     ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1);
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
-        mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+        mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -971,12 +972,13 @@ public class TestContainerReuse {
     doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
     
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
-    TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
-    taskSchedulerEventHandler.init(tezConf);
-    taskSchedulerEventHandler.start();
+    TaskSchedulerManager
+        taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
+    TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
 
-    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager)
         .getSpyTaskScheduler();
     TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -1013,10 +1015,10 @@ public class TestContainerReuse {
     AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, dag1LRs);
 
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent11);
+    taskSchedulerManager.handleEvent(lrEvent11);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent12);
+    taskSchedulerManager.handleEvent(lrEvent12);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, "host1", resource1, priority1);
@@ -1026,17 +1028,17 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
     assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
     
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta111, true, null, null);
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1045,7 +1047,7 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
@@ -1074,14 +1076,14 @@ public class TestContainerReuse {
     TaskAttempt ta212 = mock(TaskAttempt.class);
     AMSchedulerEventTALaunchRequest lrEvent22 = createLaunchRequestEvent(taID212, ta212, resource1, host1, racks, priority1, dag2LRs);
 
-    taskSchedulerEventHandler.handleEvent(lrEvent21);
-    taskSchedulerEventHandler.handleEvent(lrEvent22);
+    taskSchedulerManager.handleEvent(lrEvent21);
+    taskSchedulerManager.handleEvent(lrEvent22);
     drainableAppCallback.drain();
 
     // TODO This is terrible, need a better way to ensure the scheduling loop has run
     LOG.info("Sleeping to ensure that the scheduling loop runs");
     Thread.sleep(6000l);
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class),
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class),
         eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1089,12 +1091,12 @@ public class TestContainerReuse {
     assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
     eventHandler.reset();
 
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta211, true, null, null);
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1102,7 +1104,7 @@ public class TestContainerReuse {
     eventHandler.reset();
 
     taskScheduler.shutdown();
-    taskSchedulerEventHandler.close();
+    taskSchedulerManager.close();
   }
 
   @Test(timeout = 30000l)
@@ -1125,7 +1127,7 @@ public class TestContainerReuse {
     doReturn(new Configuration(false)).when(appContext).getAMConf();
     ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1);
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
-        mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+        mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -1134,14 +1136,14 @@ public class TestContainerReuse {
     doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
-        new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+    TaskSchedulerManager taskSchedulerManagerReal =
+        new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
             new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
-    TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
-    taskSchedulerEventHandler.init(tezConf);
-    taskSchedulerEventHandler.start();
+    TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
 
-    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager)
         .getSpyTaskScheduler();
     TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -1185,10 +1187,10 @@ public class TestContainerReuse {
     AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, v12LR);
 
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent11);
+    taskSchedulerManager.handleEvent(lrEvent11);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainNotifier.set(false);
-    taskSchedulerEventHandler.handleEvent(lrEvent12);
+    taskSchedulerManager.handleEvent(lrEvent12);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
 
     Container container1 = createContainer(1, "host1", resource1, priority1);
@@ -1199,17 +1201,17 @@ public class TestContainerReuse {
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
     assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
 
     // Task assigned to container completed successfully. Container should be re-used.
-    taskSchedulerEventHandler.handleEvent(
+    taskSchedulerManager.handleEvent(
         new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null,
             null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta111, true, null, null);
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
     eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
     assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1218,7 +1220,7 @@ public class TestContainerReuse {
 
     // Task assigned to container completed successfully.
     // Verify reuse across hosts.
-    taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
     drainableAppCallback.drain();
     verifyDeAllocateTask(taskScheduler, ta112, true, null, null);
     verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -1243,7 +1245,7 @@ public class TestContainerReuse {
     AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID211, ta211, resource1,
         host1, racks, priority1, v21LR);
 
-    taskSchedulerEventHandler.handleEvent(lrEvent21);
+    taskSchedulerManager.handleEvent(lrEvent21);
     drainableAppCallback.drain();
 
     // TODO This is terrible, need a better way to ensure the scheduling loop has run
@@ -1254,11 +1256,11 @@ public class TestContainerReuse {
 
     Thread.sleep(6000l);
     verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
-    verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
     eventHandler.reset();
 
     taskScheduler.shutdown();
-    taskSchedulerEventHandler.close();
+    taskSchedulerManager.close();
   }
 
   @Test(timeout = 10000l)
@@ -1281,7 +1283,7 @@ public class TestContainerReuse {
     AppContext appContext = mock(AppContext.class);
     doReturn(new Configuration(false)).when(appContext).getAMConf();
     AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
-        mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+        mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
     AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
     doReturn(amContainerMap).when(appContext).getAllContainers();
     doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -1290,15 +1292,15 @@ public class TestContainerReuse {
     doReturn(dagID).when(appContext).getCurrentDAGID();
     doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
 
-    TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
-        new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+    TaskSchedulerManager taskSchedulerManagerReal =
+        new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
             new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
-    TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
-    taskSchedulerEventHandler.init(tezConf);
-    taskSchedulerEventHandler.start();
+    TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
 
     TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext)
-        ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+        ((TaskSchedulerManagerForTest) taskSchedulerManager)
         .getSpyTaskScheduler();
     TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -1318,7 +1320,7 @@ public class TestContainerReuse {
     TaskAttempt ta11 = mock(TaskAttempt.class);
     AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(taID11, ta11, resource1,
         host1, racks, priority1);
-    taskSchedulerEventHandler.handleEvent(lrEvent1);
+    taskSchedulerManager.handleEvent(lrEvent1);
 
     Container container1 = createContainer(1, "host1", resource1, priority1);
 
@@ -1326,10 +1328,10 @@ public class TestContainerReuse {
     drainNotifier.set(false);
     taskScheduler.onContainersAllocated(Collections.singletonList(container1));
     drainableAppCallback.drain();
-    verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta11),
+    verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta11),
         any(Object.class), eq(container1));
     taskScheduler.shutdown();
-    taskSchedulerEventHandler.close();
+    taskSchedulerManager.close();
   }
 
   private Container createContainer(int id, String host, Resource resource, Priority priority) {

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
deleted file mode 100644
index c85be6c..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ /dev/null
@@ -1,707 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ContainerSignatureMatcher;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.client.DAGClientServer;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
-import org.apache.tez.dag.app.dag.impl.TaskImpl;
-import org.apache.tez.dag.app.dag.impl.VertexImpl;
-import org.apache.tez.dag.app.rm.container.AMContainer;
-import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
-import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.app.rm.container.AMContainerMap;
-import org.apache.tez.dag.app.rm.container.AMContainerState;
-import org.apache.tez.dag.app.web.WebUIService;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.serviceplugins.api.TaskScheduler;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-@SuppressWarnings("rawtypes")
-public class TestTaskSchedulerEventHandler {
-  
-  class TestEventHandler implements EventHandler{
-    List<Event> events = Lists.newLinkedList();
-    @Override
-    public void handle(Event event) {
-      events.add(event);
-    }
-  }
-  
-  class MockTaskSchedulerEventHandler extends TaskSchedulerEventHandler {
-
-    final AtomicBoolean notify = new AtomicBoolean(false);
-    
-    public MockTaskSchedulerEventHandler(AppContext appContext,
-        DAGClientServer clientService, EventHandler eventHandler,
-        ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
-      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI,
-          Lists.newArrayList(new NamedEntityDescriptor("FakeDescriptor", null)), false);
-    }
-
-    @Override
-    protected void instantiateSchedulers(String host, int port, String trackingUrl,
-                                         AppContext appContext) {
-      taskSchedulers[0] = mockTaskScheduler;
-      taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]);
-    }
-    
-    @Override
-    protected void notifyForTest() {
-      synchronized (notify) {
-        notify.set(true);
-        notify.notifyAll();
-      }
-    }
-    
-  }
-
-  AppContext mockAppContext;
-  DAGClientServer mockClientService;
-  TestEventHandler mockEventHandler;
-  ContainerSignatureMatcher mockSigMatcher;
-  MockTaskSchedulerEventHandler schedulerHandler;
-  TaskScheduler mockTaskScheduler;
-  AMContainerMap mockAMContainerMap;
-  WebUIService mockWebUIService;
-
-  @Before
-  public void setup() {
-    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
-    doReturn(new Configuration(false)).when(mockAppContext).getAMConf();
-    mockClientService = mock(DAGClientServer.class);
-    mockEventHandler = new TestEventHandler();
-    mockSigMatcher = mock(ContainerSignatureMatcher.class);
-    mockTaskScheduler = mock(TaskScheduler.class);
-    mockAMContainerMap = mock(AMContainerMap.class);
-    mockWebUIService = mock(WebUIService.class);
-    when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);
-    when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000));
-    schedulerHandler = new MockTaskSchedulerEventHandler(
-        mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService);
-  }
-
-  @Test(timeout = 5000)
-  public void testSimpleAllocate() throws Exception {
-    Configuration conf = new Configuration(false);
-    schedulerHandler.init(conf);
-    schedulerHandler.start();
-
-    TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class);
-    TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class);
-    when(mockAttemptId.getId()).thenReturn(0);
-    when(mockTaskAttempt.getID()).thenReturn(mockAttemptId);
-    Resource resource = Resource.newInstance(1024, 1);
-    ContainerContext containerContext =
-        new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(),
-            new HashMap<String, String>(), "");
-    int priority = 10;
-    TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(new HashSet<String>(), null);
-
-    ContainerId mockCId = mock(ContainerId.class);
-    Container container = mock(Container.class);
-    when(container.getId()).thenReturn(mockCId);
-
-    AMContainer mockAMContainer = mock(AMContainer.class);
-    when(mockAMContainer.getContainerId()).thenReturn(mockCId);
-    when(mockAMContainer.getState()).thenReturn(AMContainerState.IDLE);
-
-    when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
-
-    AMSchedulerEventTALaunchRequest lr =
-        new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
-            priority, containerContext, 0, 0, 0);
-    schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container);
-    assertEquals(2, mockEventHandler.events.size());
-    assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
-    AMContainerEventAssignTA assignEvent =
-        (AMContainerEventAssignTA) mockEventHandler.events.get(1);
-    assertEquals(priority, assignEvent.getPriority());
-    assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
-  }
-
-  @Test (timeout = 5000)
-  public void testTaskBasedAffinity() throws Exception {
-    Configuration conf = new Configuration(false);
-    schedulerHandler.init(conf);
-    schedulerHandler.start();
-
-    TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class);
-    TezTaskAttemptID taId = mock(TezTaskAttemptID.class);
-    String affVertexName = "srcVertex";
-    int affTaskIndex = 1;
-    TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(affVertexName, affTaskIndex);
-    VertexImpl affVertex = mock(VertexImpl.class);
-    TaskImpl affTask = mock(TaskImpl.class);
-    TaskAttemptImpl affAttempt = mock(TaskAttemptImpl.class);
-    ContainerId affCId = mock(ContainerId.class);
-    when(affVertex.getTotalTasks()).thenReturn(2);
-    when(affVertex.getTask(affTaskIndex)).thenReturn(affTask);
-    when(affTask.getSuccessfulAttempt()).thenReturn(affAttempt);
-    when(affAttempt.getAssignedContainerID()).thenReturn(affCId);
-    when(mockAppContext.getCurrentDAG().getVertex(affVertexName)).thenReturn(affVertex);
-    Resource resource = Resource.newInstance(100, 1);
-    AMSchedulerEventTALaunchRequest event = new AMSchedulerEventTALaunchRequest
-        (taId, resource, null, mockTaskAttempt, locHint, 3, null, 0, 0, 0);
-    schedulerHandler.notify.set(false);
-    schedulerHandler.handle(event);
-    synchronized (schedulerHandler.notify) {
-      while (!schedulerHandler.notify.get()) {
-        schedulerHandler.notify.wait();
-      }
-    }
-    
-    // verify mockTaskAttempt affinitized to expected affCId
-    verify(mockTaskScheduler, times(1)).allocateTask(mockTaskAttempt, resource, affCId,
-        Priority.newInstance(3), null, event);
-    
-    schedulerHandler.stop();
-    schedulerHandler.close();
-  }
-  
-  @Test (timeout = 5000)
-  public void testContainerPreempted() throws IOException {
-    Configuration conf = new Configuration(false);
-    schedulerHandler.init(conf);
-    schedulerHandler.start();
-    
-    String diagnostics = "Container preempted by RM.";
-    TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class);
-    ContainerStatus mockStatus = mock(ContainerStatus.class);
-    ContainerId mockCId = mock(ContainerId.class);
-    AMContainer mockAMContainer = mock(AMContainer.class);
-    when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
-    when(mockAMContainer.getContainerId()).thenReturn(mockCId);
-    when(mockStatus.getContainerId()).thenReturn(mockCId);
-    when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
-    when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.PREEMPTED);
-    schedulerHandler.containerCompleted(0, mockTask, mockStatus);
-    assertEquals(1, mockEventHandler.events.size());
-    Event event = mockEventHandler.events.get(0);
-    assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
-    AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
-    assertEquals(mockCId, completedEvent.getContainerId());
-    assertEquals("Container preempted externally. Container preempted by RM.",
-        completedEvent.getDiagnostics());
-    assertTrue(completedEvent.isPreempted());
-    assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
-        completedEvent.getTerminationCause());
-    Assert.assertFalse(completedEvent.isDiskFailed());
-
-    schedulerHandler.stop();
-    schedulerHandler.close();
-  }
-  
-  @Test (timeout = 5000)
-  public void testContainerInternalPreempted() throws IOException {
-    Configuration conf = new Configuration(false);
-    schedulerHandler.init(conf);
-    schedulerHandler.start();
-
-    AMContainer mockAmContainer = mock(AMContainer.class);
-    when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0);
-    when(mockAmContainer.getContainerLauncherIdentifier()).thenReturn(0);
-    when(mockAmContainer.getTaskCommunicatorIdentifier()).thenReturn(0);
-    ContainerId mockCId = mock(ContainerId.class);
-    verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId) any());
-    when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer);
-    schedulerHandler.preemptContainer(0, mockCId);
-    verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
-    assertEquals(1, mockEventHandler.events.size());
-    Event event = mockEventHandler.events.get(0);
-    assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
-    AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
-    assertEquals(mockCId, completedEvent.getContainerId());
-    assertEquals("Container preempted internally", completedEvent.getDiagnostics());
-    assertTrue(completedEvent.isPreempted());
-    Assert.assertFalse(completedEvent.isDiskFailed());
-    assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
-        completedEvent.getTerminationCause());
-
-    schedulerHandler.stop();
-    schedulerHandler.close();
-  }
-  
-  @Test (timeout = 5000)
-  public void testContainerDiskFailed() throws IOException {
-    Configuration conf = new Configuration(false);
-    schedulerHandler.init(conf);
-    schedulerHandler.start();
-    
-    String diagnostics = "NM disk failed.";
-    TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class);
-    ContainerStatus mockStatus = mock(ContainerStatus.class);
-    ContainerId mockCId = mock(ContainerId.class);
-    AMContainer mockAMContainer = mock(AMContainer.class);
-    when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
-    when(mockAMContainer.getContainerId()).thenReturn(mockCId);
-    when(mockStatus.getContainerId()).thenReturn(mockCId);
-    when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
-    when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.DISKS_FAILED);
-    schedulerHandler.containerCompleted(0, mockTask, mockStatus);
-    assertEquals(1, mockEventHandler.events.size());
-    Event event = mockEventHandler.events.get(0);
-    assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
-    AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
-    assertEquals(mockCId, completedEvent.getContainerId());
-    assertEquals("Container disk failed. NM disk failed.",
-        completedEvent.getDiagnostics());
-    Assert.assertFalse(completedEvent.isPreempted());
-    assertTrue(completedEvent.isDiskFailed());
-    assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
-        completedEvent.getTerminationCause());
-
-    schedulerHandler.stop();
-    schedulerHandler.close();
-  }
-
-  @Test (timeout = 5000)
-  public void testContainerExceededPMem() throws IOException {
-    Configuration conf = new Configuration(false);
-    schedulerHandler.init(conf);
-    schedulerHandler.start();
-
-    String diagnostics = "Exceeded Physical Memory";
-    TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class);
-    ContainerStatus mockStatus = mock(ContainerStatus.class);
-    ContainerId mockCId = mock(ContainerId.class);
-    AMContainer mockAMContainer = mock(AMContainer.class);
-    when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
-    when(mockAMContainer.getContainerId()).thenReturn(mockCId);
-    when(mockStatus.getContainerId()).thenReturn(mockCId);
-    when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
-    // use -104 rather than ContainerExitStatus.KILLED_EXCEEDED_PMEM because
-    // ContainerExitStatus.KILLED_EXCEEDED_PMEM is only available after hadoop-2.5
-    when(mockStatus.getExitStatus()).thenReturn(-104);
-    schedulerHandler.containerCompleted(0, mockTask, mockStatus);
-    assertEquals(1, mockEventHandler.events.size());
-    Event event = mockEventHandler.events.get(0);
-    assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
-    AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
-    assertEquals(mockCId, completedEvent.getContainerId());
-    assertEquals("Container failed, exitCode=-104. Exceeded Physical Memory",
-        completedEvent.getDiagnostics());
-    Assert.assertFalse(completedEvent.isPreempted());
-    Assert.assertFalse(completedEvent.isDiskFailed());
-    assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED,
-        completedEvent.getTerminationCause());
-
-    schedulerHandler.stop();
-    schedulerHandler.close();
-  }
-
-  @Test (timeout = 5000)
-  public void testHistoryUrlConf() throws Exception {
-    Configuration conf = schedulerHandler.appContext.getAMConf();
-
-    // ensure history url is empty when timeline server is not the logging class
-    conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9999");
-    assertTrue("".equals(schedulerHandler.getHistoryUrl()));
-
-    // ensure expansion of url happens
-    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
-        "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService");
-    final ApplicationId mockApplicationId = mock(ApplicationId.class);
-    doReturn("TEST_APP_ID").when(mockApplicationId).toString();
-    doReturn(mockApplicationId).when(mockAppContext).getApplicationID();
-    assertTrue("http://ui-host:9999/#/tez-app/TEST_APP_ID"
-        .equals(schedulerHandler.getHistoryUrl()));
-
-    // ensure the trailing / in history url is handled
-    conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9998/");
-    assertTrue("http://ui-host:9998/#/tez-app/TEST_APP_ID"
-        .equals(schedulerHandler.getHistoryUrl()));
-
-    // ensure missing scheme in history url is handled
-    conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "ui-host:9998/");
-    Assert.assertTrue("http://ui-host:9998/#/tez-app/TEST_APP_ID"
-        .equals(schedulerHandler.getHistoryUrl()));
-
-    // handle bad template ex without begining /
-    conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
-        "__HISTORY_URL_BASE__#/somepath");
-    assertTrue("http://ui-host:9998/#/somepath"
-        .equals(schedulerHandler.getHistoryUrl()));
-
-    conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
-        "__HISTORY_URL_BASE__?viewPath=tez-app/__APPLICATION_ID__");
-    conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://localhost/ui/tez");
-    assertTrue("http://localhost/ui/tez?viewPath=tez-app/TEST_APP_ID"
-        .equals(schedulerHandler.getHistoryUrl()));
-
-  }
-
-  @Test(timeout = 5000)
-  public void testNoSchedulerSpecified() throws IOException {
-    try {
-      new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
-          mockSigMatcher, mockWebUIService, null, false);
-      fail("Expecting an IllegalStateException with no schedulers specified");
-    } catch (IllegalArgumentException e) {
-    }
-  }
-
-  // Verified via statics
-  @Test(timeout = 5000)
-  public void testCustomTaskSchedulerSetup() throws IOException {
-    Configuration conf = new Configuration(false);
-    conf.set("testkey", "testval");
-    UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf);
-
-    String customSchedulerName = "fakeScheduler";
-    List<NamedEntityDescriptor> taskSchedulers = new LinkedList<>();
-    ByteBuffer bb = ByteBuffer.allocate(4);
-    bb.putInt(0, 3);
-    UserPayload userPayload = UserPayload.create(bb);
-    taskSchedulers.add(
-        new NamedEntityDescriptor(customSchedulerName, FakeTaskScheduler.class.getName())
-            .setUserPayload(userPayload));
-    taskSchedulers.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
-        .setUserPayload(defaultPayload));
-
-    TSEHForMultipleSchedulersTest tseh =
-        new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
-            mockSigMatcher, mockWebUIService, taskSchedulers, false);
-
-    tseh.init(conf);
-    tseh.start();
-
-    // Verify that the YARN task scheduler is installed by default
-    assertTrue(tseh.getYarnSchedulerCreated());
-    assertFalse(tseh.getUberSchedulerCreated());
-    assertEquals(2, tseh.getNumCreateInvocations());
-
-    // Verify the order of the schedulers
-    assertEquals(customSchedulerName, tseh.getTaskSchedulerName(0));
-    assertEquals(TezConstants.getTezYarnServicePluginName(), tseh.getTaskSchedulerName(1));
-
-    // Verify the payload setup for the custom task scheduler
-    assertNotNull(tseh.getTaskSchedulerContext(0));
-    assertEquals(bb, tseh.getTaskSchedulerContext(0).getInitialUserPayload().getPayload());
-
-    // Verify the payload on the yarn scheduler
-    assertNotNull(tseh.getTaskSchedulerContext(1));
-    Configuration parsed = TezUtils.createConfFromUserPayload(tseh.getTaskSchedulerContext(1).getInitialUserPayload());
-    assertEquals("testval", parsed.get("testkey"));
-  }
-
-  @Test(timeout = 5000)
-  public void testTaskSchedulerRouting() throws Exception {
-    Configuration conf = new Configuration(false);
-    UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf);
-
-    String customSchedulerName = "fakeScheduler";
-    List<NamedEntityDescriptor> taskSchedulers = new LinkedList<>();
-    ByteBuffer bb = ByteBuffer.allocate(4);
-    bb.putInt(0, 3);
-    UserPayload userPayload = UserPayload.create(bb);
-    taskSchedulers.add(
-        new NamedEntityDescriptor(customSchedulerName, FakeTaskScheduler.class.getName())
-            .setUserPayload(userPayload));
-    taskSchedulers.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
-        .setUserPayload(defaultPayload));
-
-    TSEHForMultipleSchedulersTest tseh =
-        new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
-            mockSigMatcher, mockWebUIService, taskSchedulers, false);
-
-    tseh.init(conf);
-    tseh.start();
-
-    // Verify that the YARN task scheduler is installed by default
-    assertTrue(tseh.getYarnSchedulerCreated());
-    assertFalse(tseh.getUberSchedulerCreated());
-    assertEquals(2, tseh.getNumCreateInvocations());
-
-    // Verify the order of the schedulers
-    assertEquals(customSchedulerName, tseh.getTaskSchedulerName(0));
-    assertEquals(TezConstants.getTezYarnServicePluginName(), tseh.getTaskSchedulerName(1));
-
-    verify(tseh.getTestTaskScheduler(0)).initialize();
-    verify(tseh.getTestTaskScheduler(0)).start();
-
-    ApplicationId appId = ApplicationId.newInstance(1000, 1);
-    TezDAGID dagId = TezDAGID.getInstance(appId, 1);
-    TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
-    TezTaskID taskId1 = TezTaskID.getInstance(vertexID, 1);
-    TezTaskAttemptID attemptId11 = TezTaskAttemptID.getInstance(taskId1, 1);
-    TezTaskID taskId2 = TezTaskID.getInstance(vertexID, 2);
-    TezTaskAttemptID attemptId21 = TezTaskAttemptID.getInstance(taskId2, 1);
-
-    Resource resource = Resource.newInstance(1024, 1);
-
-    TaskAttempt mockTaskAttempt1 = mock(TaskAttempt.class);
-    TaskAttempt mockTaskAttempt2 = mock(TaskAttempt.class);
-
-    AMSchedulerEventTALaunchRequest launchRequest1 =
-        new AMSchedulerEventTALaunchRequest(attemptId11, resource, mock(TaskSpec.class),
-            mockTaskAttempt1, mock(TaskLocationHint.class), 1, mock(ContainerContext.class), 0, 0,
-            0);
-
-    tseh.handle(launchRequest1);
-
-    verify(tseh.getTestTaskScheduler(0)).allocateTask(eq(mockTaskAttempt1), eq(resource),
-        any(String[].class), any(String[].class), any(Priority.class), any(Object.class),
-        eq(launchRequest1));
-
-    AMSchedulerEventTALaunchRequest launchRequest2 =
-        new AMSchedulerEventTALaunchRequest(attemptId21, resource, mock(TaskSpec.class),
-            mockTaskAttempt2, mock(TaskLocationHint.class), 1, mock(ContainerContext.class), 1, 0,
-            0);
-    tseh.handle(launchRequest2);
-    verify(tseh.getTestTaskScheduler(1)).allocateTask(eq(mockTaskAttempt2), eq(resource),
-        any(String[].class), any(String[].class), any(Priority.class), any(Object.class),
-        eq(launchRequest2));
-  }
-
-  private static class TSEHForMultipleSchedulersTest extends TaskSchedulerEventHandler {
-
-    private final TaskScheduler yarnTaskScheduler;
-    private final TaskScheduler uberTaskScheduler;
-    private final AtomicBoolean uberSchedulerCreated = new AtomicBoolean(false);
-    private final AtomicBoolean yarnSchedulerCreated = new AtomicBoolean(false);
-    private final AtomicInteger numCreateInvocations = new AtomicInteger(0);
-    private final Set<Integer> seenSchedulers = new HashSet<>();
-    private final List<TaskSchedulerContext> taskSchedulerContexts = new LinkedList<>();
-    private final List<String> taskSchedulerNames = new LinkedList<>();
-    private final List<TaskScheduler> testTaskSchedulers = new LinkedList<>();
-
-    public TSEHForMultipleSchedulersTest(AppContext appContext,
-                                         DAGClientServer clientService,
-                                         EventHandler eventHandler,
-                                         ContainerSignatureMatcher containerSignatureMatcher,
-                                         WebUIService webUI,
-                                         List<NamedEntityDescriptor> schedulerDescriptors,
-                                         boolean isPureLocalMode) {
-      super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI,
-          schedulerDescriptors, isPureLocalMode);
-      yarnTaskScheduler = mock(TaskScheduler.class);
-      uberTaskScheduler = mock(TaskScheduler.class);
-    }
-
-    @Override
-    TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
-                                      AppContext appContext,
-                                      NamedEntityDescriptor taskSchedulerDescriptor,
-                                      long customAppIdIdentifier,
-                                      int schedulerId) {
-
-      numCreateInvocations.incrementAndGet();
-      boolean added = seenSchedulers.add(schedulerId);
-      assertTrue("Cannot add multiple schedulers with the same schedulerId", added);
-      taskSchedulerNames.add(taskSchedulerDescriptor.getEntityName());
-      return super.createTaskScheduler(host, port, trackingUrl, appContext, taskSchedulerDescriptor,
-          customAppIdIdentifier, schedulerId);
-    }
-
-    @Override
-    TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
-      // Avoid wrapping in threads
-      return rawContext;
-    }
-
-    @Override
-    TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) {
-      taskSchedulerContexts.add(taskSchedulerContext);
-      testTaskSchedulers.add(yarnTaskScheduler);
-      yarnSchedulerCreated.set(true);
-      return yarnTaskScheduler;
-    }
-
-    @Override
-    TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) {
-      taskSchedulerContexts.add(taskSchedulerContext);
-      uberSchedulerCreated.set(true);
-      testTaskSchedulers.add(yarnTaskScheduler);
-      return uberTaskScheduler;
-    }
-
-    @Override
-    TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
-                                            NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) {
-      taskSchedulerContexts.add(taskSchedulerContext);
-      TaskScheduler taskScheduler = spy(super.createCustomTaskScheduler(taskSchedulerContext, taskSchedulerDescriptor, schedulerId));
-      testTaskSchedulers.add(taskScheduler);
-      return taskScheduler;
-    }
-
-    @Override
-    // Inline handling of events.
-    public void handle(AMSchedulerEvent event) {
-      handleEvent(event);
-    }
-
-    public boolean getUberSchedulerCreated() {
-      return uberSchedulerCreated.get();
-    }
-
-    public boolean getYarnSchedulerCreated() {
-      return yarnSchedulerCreated.get();
-    }
-
-    public int getNumCreateInvocations() {
-      return numCreateInvocations.get();
-    }
-
-    public TaskSchedulerContext getTaskSchedulerContext(int schedulerId) {
-      return taskSchedulerContexts.get(schedulerId);
-    }
-
-    public String getTaskSchedulerName(int schedulerId) {
-      return taskSchedulerNames.get(schedulerId);
-    }
-
-    public TaskScheduler getTestTaskScheduler(int schedulerId) {
-      return testTaskSchedulers.get(schedulerId);
-    }
-  }
-
-  public static class FakeTaskScheduler extends TaskScheduler {
-
-    public FakeTaskScheduler(
-        TaskSchedulerContext taskSchedulerContext) {
-      super(taskSchedulerContext);
-    }
-
-    @Override
-    public Resource getAvailableResources() {
-      return null;
-    }
-
-    @Override
-    public int getClusterNodeCount() {
-      return 0;
-    }
-
-    @Override
-    public void dagComplete() {
-
-    }
-
-    @Override
-    public Resource getTotalResources() {
-      return null;
-    }
-
-    @Override
-    public void blacklistNode(NodeId nodeId) {
-
-    }
-
-    @Override
-    public void unblacklistNode(NodeId nodeId) {
-
-    }
-
-    @Override
-    public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
-                             Priority priority, Object containerSignature, Object clientCookie) {
-
-    }
-
-    @Override
-    public void allocateTask(Object task, Resource capability, ContainerId containerId,
-                             Priority priority, Object containerSignature, Object clientCookie) {
-
-    }
-
-    @Override
-    public boolean deallocateTask(Object task, boolean taskSucceeded,
-                                  TaskAttemptEndReason endReason,
-                                  String diagnostics) {
-      return false;
-    }
-
-    @Override
-    public Object deallocateContainer(ContainerId containerId) {
-      return null;
-    }
-
-    @Override
-    public void setShouldUnregister() {
-
-    }
-
-    @Override
-    public boolean hasUnregistered() {
-      return false;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index c13ca5a..b1bc491 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -125,19 +125,19 @@ class TestTaskSchedulerHelpers {
   }
   
   // Overrides start / stop. Will be controlled without the extra event handling thread.
-  static class TaskSchedulerEventHandlerForTest extends
-      TaskSchedulerEventHandler {
+  static class TaskSchedulerManagerForTest extends
+      TaskSchedulerManager {
 
     private TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync;
     private ContainerSignatureMatcher containerSignatureMatcher;
     private UserPayload defaultPayload;
 
     @SuppressWarnings("rawtypes")
-    public TaskSchedulerEventHandlerForTest(AppContext appContext,
-        EventHandler eventHandler,
-        TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
-        ContainerSignatureMatcher containerSignatureMatcher,
-        UserPayload defaultPayload) {
+    public TaskSchedulerManagerForTest(AppContext appContext,
+                                       EventHandler eventHandler,
+                                       TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
+                                       ContainerSignatureMatcher containerSignatureMatcher,
+                                       UserPayload defaultPayload) {
       super(appContext, null, eventHandler, containerSignatureMatcher, null,
           Lists.newArrayList(new NamedEntityDescriptor("FakeScheduler", null)),
           false);