You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:50 UTC

[3/7] tez git commit: TEZ-2708. Rename classes and variables post TEZ-2003 changes. (sseth)

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6ee741a..a8ba445 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -117,7 +117,7 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.MockClock;
 import org.apache.tez.dag.app.TaskAttemptEventInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.RootInputInitializerManager;
@@ -153,7 +153,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation;
-import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
 import org.apache.tez.dag.app.rm.container.AMContainerMap;
 import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
 import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -161,7 +161,6 @@ import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
 import org.apache.tez.dag.records.TaskAttemptTerminationCause;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -228,7 +227,7 @@ public class TestVertexImpl {
   private Map<String, VertexImpl> vertices;
   private Map<TezVertexID, VertexImpl> vertexIdMap;
   private DrainDispatcher dispatcher;
-  private TaskAttemptListener taskAttemptListener;
+  private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
   private Clock clock = new SystemClock();
   private TaskHeartbeatHandler thh;
   private AppContext appContext;
@@ -2077,16 +2076,16 @@ public class TestVertexImpl {
       if (useCustomInitializer) {
         if (customInitializer == null) {
           v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf,
-              dispatcher.getEventHandler(), taskAttemptListener,
+              dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
               clock, thh, appContext, locationHint, dispatcher, updateTracker);
         } else {
           v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf,
-              dispatcher.getEventHandler(), taskAttemptListener,
+              dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
               clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker);
         }
       } else {
         v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
-            dispatcher.getEventHandler(), taskAttemptListener,
+            dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
             clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption,
             updateTracker);
       }
@@ -2162,7 +2161,7 @@ public class TestVertexImpl {
     appContext = mock(AppContext.class);
     thh = mock(TaskHeartbeatHandler.class);
     historyEventHandler = mock(HistoryEventHandler.class);
-    TaskSchedulerEventHandler taskScheduler = mock(TaskSchedulerEventHandler.class);
+    TaskSchedulerManager taskScheduler = mock(TaskSchedulerManager.class);
     UserGroupInformation ugi;
     try {
       ugi = UserGroupInformation.getCurrentUser();
@@ -3227,7 +3226,7 @@ public class TestVertexImpl {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appContext);
     containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
@@ -3262,7 +3261,7 @@ public class TestVertexImpl {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appContext);
     containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
@@ -3298,7 +3297,7 @@ public class TestVertexImpl {
     when(container.getNodeId()).thenReturn(nid);
     when(container.getNodeHttpAddress()).thenReturn("localhost:0");
     AMContainerMap containers = new AMContainerMap(
-        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
         new ContainerContextMatcher(), appContext);
     containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
@@ -5179,7 +5178,7 @@ public class TestVertexImpl {
       vId = TezVertexID.getInstance(invalidDagId, 1);
       VertexPlan vPlan = invalidDagPlan.getVertex(0);
       VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
-          dispatcher.getEventHandler(), taskAttemptListener,
+          dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
           clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
           updateTracker);
       v.setInputVertices(new HashMap());
@@ -5208,7 +5207,7 @@ public class TestVertexImpl {
                                                  VertexPlan vertexPlan, String vertexName,
                                                  Configuration conf,
                                                  EventHandler eventHandler,
-                                                 TaskAttemptListener taskAttemptListener,
+                                                 TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                                  Clock clock, TaskHeartbeatHandler thh,
                                                  AppContext appContext,
                                                  VertexLocationHint vertexLocationHint,
@@ -5216,7 +5215,7 @@ public class TestVertexImpl {
                                                  InputInitializer presetInitializer,
                                                  StateChangeNotifier updateTracker) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
-          taskAttemptListener, clock, thh, true,
+          taskCommunicatorManagerInterface, clock, thh, true,
           appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
           updateTracker);
       this.presetInitializer = presetInitializer;
@@ -5248,14 +5247,14 @@ public class TestVertexImpl {
                                                       VertexPlan vertexPlan, String vertexName,
                                                       Configuration conf,
                                                       EventHandler eventHandler,
-                                                      TaskAttemptListener taskAttemptListener,
+                                                      TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
                                                       Clock clock, TaskHeartbeatHandler thh,
                                                       AppContext appContext,
                                                       VertexLocationHint vertexLocationHint,
                                                       DrainDispatcher dispatcher,
                                                       StateChangeNotifier updateTracker) {
       super(vertexId, vertexPlan, vertexName, conf, eventHandler,
-          taskAttemptListener, clock, thh, true,
+          taskCommunicatorManagerInterface, clock, thh, true,
           appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
           updateTracker);
       this.dispatcher = dispatcher;

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index 0e34f68..8bd288a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -54,7 +54,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.DAGAppMaster;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
 import org.apache.tez.dag.app.dag.StateChangeNotifier;
@@ -517,7 +517,7 @@ public class TestVertexImpl2 {
 
       vertex =
           new VertexImpl(TezVertexID.fromString("vertex_1418197758681_0001_1_00"), vertexPlan,
-              "testvertex", conf, mock(EventHandler.class), mock(TaskAttemptListener.class),
+              "testvertex", conf, mock(EventHandler.class), mock(TaskCommunicatorManagerInterface.class),
               mock(Clock.class), mock(TaskHeartbeatHandler.class), false, mockAppContext,
               VertexLocationHint.create(new LinkedList<TaskLocationHint>()), null,
               new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class));

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 0f532fb..e389d64 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -55,7 +55,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.Task;
@@ -434,7 +434,7 @@ public class TestVertexRecovery {
     DAGPlan dagPlan = createDAGPlan();
     dag =
         new DAGImpl(dagId, new Configuration(), dagPlan,
-            dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+            dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class),
             new Credentials(), new SystemClock(), user,
             mock(TaskHeartbeatHandler.class), mockAppContext);
     when(mockAppContext.getCurrentDAG()).thenReturn(dag);
@@ -544,7 +544,7 @@ public class TestVertexRecovery {
     DAGPlan dagPlan = createDAGPlanSingleVertex();
     dag =
         new DAGImpl(dagId, new Configuration(), dagPlan,
-            dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+            dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class),
             new Credentials(), new SystemClock(), user,
             mock(TaskHeartbeatHandler.class), mockAppContext);
     when(mockAppContext.getCurrentDAG()).thenReturn(dag);
@@ -924,7 +924,7 @@ public class TestVertexRecovery {
     DAGPlan dagPlan = createDAGPlanMR();
     dag =
         new DAGImpl(dagId, new Configuration(), dagPlan,
-            dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+            dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class),
             new Credentials(), new SystemClock(), user,
             mock(TaskHeartbeatHandler.class), mockAppContext);
     when(mockAppContext.getCurrentDAG()).thenReturn(dag);
@@ -965,7 +965,7 @@ public class TestVertexRecovery {
     DAGPlan dagPlan = createDAGPlan();
     dag =
         new DAGImpl(dagId, new Configuration(), dagPlan,
-            dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+            dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class),
             new Credentials(), new SystemClock(), user,
             mock(TaskHeartbeatHandler.class), mockAppContext);
     when(mockAppContext.getCurrentDAG()).thenReturn(dag);

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
new file mode 100644
index 0000000..4b931d4
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestContainerLauncherManager {
+
+  @Before
+  @After
+  public void reset() {
+    ContainerLaucherRouterForMultipleLauncherTest.reset();
+  }
+
+  @Test(timeout = 5000)
+  public void testNoLaunchersSpecified() throws IOException {
+
+    AppContext appContext = mock(AppContext.class);
+    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
+
+    try {
+
+      new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, null,
+          false);
+      fail("Expecting a failure without any launchers being specified");
+    } catch (IllegalArgumentException e) {
+
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testCustomLauncherSpecified() throws IOException {
+    Configuration conf = new Configuration(false);
+
+    AppContext appContext = mock(AppContext.class);
+    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
+
+    String customLauncherName = "customLauncher";
+    List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
+    ByteBuffer bb = ByteBuffer.allocate(4);
+    bb.putInt(0, 3);
+    UserPayload customPayload = UserPayload.create(bb);
+    launcherDescriptors.add(
+        new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
+            .setUserPayload(customPayload));
+
+    ContainerLaucherRouterForMultipleLauncherTest clr =
+        new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
+            launcherDescriptors,
+            true);
+    try {
+      clr.init(conf);
+      clr.start();
+
+      assertEquals(1, clr.getNumContainerLaunchers());
+      assertFalse(clr.getYarnContainerLauncherCreated());
+      assertFalse(clr.getUberContainerLauncherCreated());
+      assertEquals(customLauncherName, clr.getContainerLauncherName(0));
+      assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload());
+    } finally {
+      clr.stop();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testMultipleContainerLaunchers() throws IOException {
+    Configuration conf = new Configuration(false);
+    conf.set("testkey", "testvalue");
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+
+    AppContext appContext = mock(AppContext.class);
+    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
+
+    String customLauncherName = "customLauncher";
+    List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
+    ByteBuffer bb = ByteBuffer.allocate(4);
+    bb.putInt(0, 3);
+    UserPayload customPayload = UserPayload.create(bb);
+    launcherDescriptors.add(
+        new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
+            .setUserPayload(customPayload));
+    launcherDescriptors
+        .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+            .setUserPayload(userPayload));
+
+    ContainerLaucherRouterForMultipleLauncherTest clr =
+        new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
+            launcherDescriptors,
+            true);
+    try {
+      clr.init(conf);
+      clr.start();
+
+      assertEquals(2, clr.getNumContainerLaunchers());
+      assertTrue(clr.getYarnContainerLauncherCreated());
+      assertFalse(clr.getUberContainerLauncherCreated());
+      assertEquals(customLauncherName, clr.getContainerLauncherName(0));
+      assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload());
+
+      assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1));
+      Configuration confParsed = TezUtils
+          .createConfFromUserPayload(clr.getContainerLauncherContext(1).getInitialUserPayload());
+      assertEquals("testvalue", confParsed.get("testkey"));
+    } finally {
+      clr.stop();
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testEventRouting() throws Exception {
+    Configuration conf = new Configuration(false);
+    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+
+    AppContext appContext = mock(AppContext.class);
+    TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
+
+    String customLauncherName = "customLauncher";
+    List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
+    ByteBuffer bb = ByteBuffer.allocate(4);
+    bb.putInt(0, 3);
+    UserPayload customPayload = UserPayload.create(bb);
+    launcherDescriptors.add(
+        new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
+            .setUserPayload(customPayload));
+    launcherDescriptors
+        .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+            .setUserPayload(userPayload));
+
+    ContainerLaucherRouterForMultipleLauncherTest clr =
+        new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
+            launcherDescriptors,
+            true);
+    try {
+      clr.init(conf);
+      clr.start();
+
+      assertEquals(2, clr.getNumContainerLaunchers());
+      assertTrue(clr.getYarnContainerLauncherCreated());
+      assertFalse(clr.getUberContainerLauncherCreated());
+      assertEquals(customLauncherName, clr.getContainerLauncherName(0));
+      assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1));
+
+      verify(clr.getTestContainerLauncher(0)).initialize();
+      verify(clr.getTestContainerLauncher(0)).start();
+      verify(clr.getTestContainerLauncher(1)).initialize();
+      verify(clr.getTestContainerLauncher(1)).start();
+
+      ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class);
+      Container container1 = mock(Container.class);
+
+      ContainerLaunchContext clc2 = mock(ContainerLaunchContext.class);
+      Container container2 = mock(Container.class);
+
+      ContainerLauncherLaunchRequestEvent launchRequestEvent1 =
+          new ContainerLauncherLaunchRequestEvent(clc1, container1, 0, 0, 0);
+      ContainerLauncherLaunchRequestEvent launchRequestEvent2 =
+          new ContainerLauncherLaunchRequestEvent(clc2, container2, 1, 0, 0);
+
+      clr.handle(launchRequestEvent1);
+
+
+      ArgumentCaptor<ContainerLaunchRequest> captor =
+          ArgumentCaptor.forClass(ContainerLaunchRequest.class);
+      verify(clr.getTestContainerLauncher(0)).launchContainer(captor.capture());
+      assertEquals(1, captor.getAllValues().size());
+      ContainerLaunchRequest launchRequest1 = captor.getValue();
+      assertEquals(clc1, launchRequest1.getContainerLaunchContext());
+
+      clr.handle(launchRequestEvent2);
+      captor = ArgumentCaptor.forClass(ContainerLaunchRequest.class);
+      verify(clr.getTestContainerLauncher(1)).launchContainer(captor.capture());
+      assertEquals(1, captor.getAllValues().size());
+      ContainerLaunchRequest launchRequest2 = captor.getValue();
+      assertEquals(clc2, launchRequest2.getContainerLaunchContext());
+
+    } finally {
+      clr.stop();
+      verify(clr.getTestContainerLauncher(0)).shutdown();
+      verify(clr.getTestContainerLauncher(1)).shutdown();
+    }
+  }
+
+  private static class ContainerLaucherRouterForMultipleLauncherTest
+      extends ContainerLauncherManager {
+
+    // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor,
+    // and regular variables will not be initialized at this point.
+    private static final AtomicInteger numContainerLaunchers = new AtomicInteger(0);
+    private static final Set<Integer> containerLauncherIndices = new HashSet<>();
+    private static final ContainerLauncher yarnContainerLauncher = mock(ContainerLauncher.class);
+    private static final ContainerLauncher uberContainerlauncher = mock(ContainerLauncher.class);
+    private static final AtomicBoolean yarnContainerLauncherCreated = new AtomicBoolean(false);
+    private static final AtomicBoolean uberContainerLauncherCreated = new AtomicBoolean(false);
+
+    private static final List<ContainerLauncherContext> containerLauncherContexts =
+        new LinkedList<>();
+    private static final List<String> containerLauncherNames = new LinkedList<>();
+    private static final List<ContainerLauncher> testContainerLaunchers = new LinkedList<>();
+
+
+    public static void reset() {
+      numContainerLaunchers.set(0);
+      containerLauncherIndices.clear();
+      yarnContainerLauncherCreated.set(false);
+      uberContainerLauncherCreated.set(false);
+      containerLauncherContexts.clear();
+      containerLauncherNames.clear();
+      testContainerLaunchers.clear();
+    }
+
+    public ContainerLaucherRouterForMultipleLauncherTest(AppContext context,
+                                                         TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+                                                         String workingDirectory,
+                                                         List<NamedEntityDescriptor> containerLauncherDescriptors,
+                                                         boolean isPureLocalMode) throws
+        UnknownHostException {
+      super(context, taskCommunicatorManagerInterface, workingDirectory,
+          containerLauncherDescriptors, isPureLocalMode);
+    }
+
+    @Override
+    ContainerLauncher createContainerLauncher(NamedEntityDescriptor containerLauncherDescriptor,
+                                              AppContext context,
+                                              ContainerLauncherContext containerLauncherContext,
+                                              TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+                                              String workingDirectory,
+                                              int containerLauncherIndex,
+                                              boolean isPureLocalMode) {
+      numContainerLaunchers.incrementAndGet();
+      boolean added = containerLauncherIndices.add(containerLauncherIndex);
+      assertTrue("Cannot add multiple launchers with the same index", added);
+      containerLauncherNames.add(containerLauncherDescriptor.getEntityName());
+      containerLauncherContexts.add(containerLauncherContext);
+      return super
+          .createContainerLauncher(containerLauncherDescriptor, context, containerLauncherContext,
+              taskCommunicatorManagerInterface, workingDirectory, containerLauncherIndex, isPureLocalMode);
+    }
+
+    @Override
+    ContainerLauncher createYarnContainerLauncher(
+        ContainerLauncherContext containerLauncherContext) {
+      yarnContainerLauncherCreated.set(true);
+      testContainerLaunchers.add(yarnContainerLauncher);
+      return yarnContainerLauncher;
+    }
+
+    @Override
+    ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext,
+                                                  AppContext context,
+                                                  TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+                                                  String workingDirectory,
+                                                  boolean isPureLocalMode) {
+      uberContainerLauncherCreated.set(true);
+      testContainerLaunchers.add(uberContainerlauncher);
+      return uberContainerlauncher;
+    }
+
+    @Override
+    ContainerLauncher createCustomContainerLauncher(
+        ContainerLauncherContext containerLauncherContext,
+        NamedEntityDescriptor containerLauncherDescriptor) {
+      ContainerLauncher spyLauncher = spy(super.createCustomContainerLauncher(
+          containerLauncherContext, containerLauncherDescriptor));
+      testContainerLaunchers.add(spyLauncher);
+      return spyLauncher;
+    }
+
+    public int getNumContainerLaunchers() {
+      return numContainerLaunchers.get();
+    }
+
+    public boolean getYarnContainerLauncherCreated() {
+      return yarnContainerLauncherCreated.get();
+    }
+
+    public boolean getUberContainerLauncherCreated() {
+      return uberContainerLauncherCreated.get();
+    }
+
+    public String getContainerLauncherName(int containerLauncherIndex) {
+      return containerLauncherNames.get(containerLauncherIndex);
+    }
+
+    public ContainerLauncher getTestContainerLauncher(int containerLauncherIndex) {
+      return testContainerLaunchers.get(containerLauncherIndex);
+    }
+
+    public ContainerLauncherContext getContainerLauncherContext(int containerLauncherIndex) {
+      return containerLauncherContexts.get(containerLauncherIndex);
+    }
+  }
+
+  private static class FakeContainerLauncher extends ContainerLauncher {
+
+    public FakeContainerLauncher(
+        ContainerLauncherContext containerLauncherContext) {
+      super(containerLauncherContext);
+    }
+
+    @Override
+    public void launchContainer(ContainerLaunchRequest launchRequest) {
+
+    }
+
+    @Override
+    public void stopContainer(ContainerStopRequest stopRequest) {
+
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
deleted file mode 100644
index d0caf8c..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.launcher;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
-import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
-import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-public class TestContainerLauncherRouter {
-
-  @Before
-  @After
-  public void reset() {
-    ContainerLaucherRouterForMultipleLauncherTest.reset();
-  }
-
-  @Test(timeout = 5000)
-  public void testNoLaunchersSpecified() throws IOException {
-
-    AppContext appContext = mock(AppContext.class);
-    TaskAttemptListener tal = mock(TaskAttemptListener.class);
-
-    try {
-
-      new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, null,
-          false);
-      fail("Expecting a failure without any launchers being specified");
-    } catch (IllegalArgumentException e) {
-
-    }
-  }
-
-  @Test(timeout = 5000)
-  public void testCustomLauncherSpecified() throws IOException {
-    Configuration conf = new Configuration(false);
-
-    AppContext appContext = mock(AppContext.class);
-    TaskAttemptListener tal = mock(TaskAttemptListener.class);
-
-    String customLauncherName = "customLauncher";
-    List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
-    ByteBuffer bb = ByteBuffer.allocate(4);
-    bb.putInt(0, 3);
-    UserPayload customPayload = UserPayload.create(bb);
-    launcherDescriptors.add(
-        new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
-            .setUserPayload(customPayload));
-
-    ContainerLaucherRouterForMultipleLauncherTest clr =
-        new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
-            launcherDescriptors,
-            true);
-    try {
-      clr.init(conf);
-      clr.start();
-
-      assertEquals(1, clr.getNumContainerLaunchers());
-      assertFalse(clr.getYarnContainerLauncherCreated());
-      assertFalse(clr.getUberContainerLauncherCreated());
-      assertEquals(customLauncherName, clr.getContainerLauncherName(0));
-      assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload());
-    } finally {
-      clr.stop();
-    }
-  }
-
-  @Test(timeout = 5000)
-  public void testMultipleContainerLaunchers() throws IOException {
-    Configuration conf = new Configuration(false);
-    conf.set("testkey", "testvalue");
-    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-
-    AppContext appContext = mock(AppContext.class);
-    TaskAttemptListener tal = mock(TaskAttemptListener.class);
-
-    String customLauncherName = "customLauncher";
-    List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
-    ByteBuffer bb = ByteBuffer.allocate(4);
-    bb.putInt(0, 3);
-    UserPayload customPayload = UserPayload.create(bb);
-    launcherDescriptors.add(
-        new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
-            .setUserPayload(customPayload));
-    launcherDescriptors
-        .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
-            .setUserPayload(userPayload));
-
-    ContainerLaucherRouterForMultipleLauncherTest clr =
-        new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
-            launcherDescriptors,
-            true);
-    try {
-      clr.init(conf);
-      clr.start();
-
-      assertEquals(2, clr.getNumContainerLaunchers());
-      assertTrue(clr.getYarnContainerLauncherCreated());
-      assertFalse(clr.getUberContainerLauncherCreated());
-      assertEquals(customLauncherName, clr.getContainerLauncherName(0));
-      assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload());
-
-      assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1));
-      Configuration confParsed = TezUtils
-          .createConfFromUserPayload(clr.getContainerLauncherContext(1).getInitialUserPayload());
-      assertEquals("testvalue", confParsed.get("testkey"));
-    } finally {
-      clr.stop();
-    }
-  }
-
-  @Test(timeout = 5000)
-  public void testEventRouting() throws Exception {
-    Configuration conf = new Configuration(false);
-    UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-
-    AppContext appContext = mock(AppContext.class);
-    TaskAttemptListener tal = mock(TaskAttemptListener.class);
-
-    String customLauncherName = "customLauncher";
-    List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
-    ByteBuffer bb = ByteBuffer.allocate(4);
-    bb.putInt(0, 3);
-    UserPayload customPayload = UserPayload.create(bb);
-    launcherDescriptors.add(
-        new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
-            .setUserPayload(customPayload));
-    launcherDescriptors
-        .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
-            .setUserPayload(userPayload));
-
-    ContainerLaucherRouterForMultipleLauncherTest clr =
-        new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
-            launcherDescriptors,
-            true);
-    try {
-      clr.init(conf);
-      clr.start();
-
-      assertEquals(2, clr.getNumContainerLaunchers());
-      assertTrue(clr.getYarnContainerLauncherCreated());
-      assertFalse(clr.getUberContainerLauncherCreated());
-      assertEquals(customLauncherName, clr.getContainerLauncherName(0));
-      assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1));
-
-      verify(clr.getTestContainerLauncher(0)).initialize();
-      verify(clr.getTestContainerLauncher(0)).start();
-      verify(clr.getTestContainerLauncher(1)).initialize();
-      verify(clr.getTestContainerLauncher(1)).start();
-
-      ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class);
-      Container container1 = mock(Container.class);
-
-      ContainerLaunchContext clc2 = mock(ContainerLaunchContext.class);
-      Container container2 = mock(Container.class);
-
-      NMCommunicatorLaunchRequestEvent launchRequestEvent1 =
-          new NMCommunicatorLaunchRequestEvent(clc1, container1, 0, 0, 0);
-      NMCommunicatorLaunchRequestEvent launchRequestEvent2 =
-          new NMCommunicatorLaunchRequestEvent(clc2, container2, 1, 0, 0);
-
-      clr.handle(launchRequestEvent1);
-
-
-      ArgumentCaptor<ContainerLaunchRequest> captor =
-          ArgumentCaptor.forClass(ContainerLaunchRequest.class);
-      verify(clr.getTestContainerLauncher(0)).launchContainer(captor.capture());
-      assertEquals(1, captor.getAllValues().size());
-      ContainerLaunchRequest launchRequest1 = captor.getValue();
-      assertEquals(clc1, launchRequest1.getContainerLaunchContext());
-
-      clr.handle(launchRequestEvent2);
-      captor = ArgumentCaptor.forClass(ContainerLaunchRequest.class);
-      verify(clr.getTestContainerLauncher(1)).launchContainer(captor.capture());
-      assertEquals(1, captor.getAllValues().size());
-      ContainerLaunchRequest launchRequest2 = captor.getValue();
-      assertEquals(clc2, launchRequest2.getContainerLaunchContext());
-
-    } finally {
-      clr.stop();
-      verify(clr.getTestContainerLauncher(0)).shutdown();
-      verify(clr.getTestContainerLauncher(1)).shutdown();
-    }
-  }
-
-  private static class ContainerLaucherRouterForMultipleLauncherTest
-      extends ContainerLauncherRouter {
-
-    // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor,
-    // and regular variables will not be initialized at this point.
-    private static final AtomicInteger numContainerLaunchers = new AtomicInteger(0);
-    private static final Set<Integer> containerLauncherIndices = new HashSet<>();
-    private static final ContainerLauncher yarnContainerLauncher = mock(ContainerLauncher.class);
-    private static final ContainerLauncher uberContainerlauncher = mock(ContainerLauncher.class);
-    private static final AtomicBoolean yarnContainerLauncherCreated = new AtomicBoolean(false);
-    private static final AtomicBoolean uberContainerLauncherCreated = new AtomicBoolean(false);
-
-    private static final List<ContainerLauncherContext> containerLauncherContexts =
-        new LinkedList<>();
-    private static final List<String> containerLauncherNames = new LinkedList<>();
-    private static final List<ContainerLauncher> testContainerLaunchers = new LinkedList<>();
-
-
-    public static void reset() {
-      numContainerLaunchers.set(0);
-      containerLauncherIndices.clear();
-      yarnContainerLauncherCreated.set(false);
-      uberContainerLauncherCreated.set(false);
-      containerLauncherContexts.clear();
-      containerLauncherNames.clear();
-      testContainerLaunchers.clear();
-    }
-
-    public ContainerLaucherRouterForMultipleLauncherTest(AppContext context,
-                                                         TaskAttemptListener taskAttemptListener,
-                                                         String workingDirectory,
-                                                         List<NamedEntityDescriptor> containerLauncherDescriptors,
-                                                         boolean isPureLocalMode) throws
-        UnknownHostException {
-      super(context, taskAttemptListener, workingDirectory,
-          containerLauncherDescriptors, isPureLocalMode);
-    }
-
-    @Override
-    ContainerLauncher createContainerLauncher(NamedEntityDescriptor containerLauncherDescriptor,
-                                              AppContext context,
-                                              ContainerLauncherContext containerLauncherContext,
-                                              TaskAttemptListener taskAttemptListener,
-                                              String workingDirectory,
-                                              int containerLauncherIndex,
-                                              boolean isPureLocalMode) {
-      numContainerLaunchers.incrementAndGet();
-      boolean added = containerLauncherIndices.add(containerLauncherIndex);
-      assertTrue("Cannot add multiple launchers with the same index", added);
-      containerLauncherNames.add(containerLauncherDescriptor.getEntityName());
-      containerLauncherContexts.add(containerLauncherContext);
-      return super
-          .createContainerLauncher(containerLauncherDescriptor, context, containerLauncherContext,
-              taskAttemptListener, workingDirectory, containerLauncherIndex, isPureLocalMode);
-    }
-
-    @Override
-    ContainerLauncher createYarnContainerLauncher(
-        ContainerLauncherContext containerLauncherContext) {
-      yarnContainerLauncherCreated.set(true);
-      testContainerLaunchers.add(yarnContainerLauncher);
-      return yarnContainerLauncher;
-    }
-
-    @Override
-    ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext,
-                                                  AppContext context,
-                                                  TaskAttemptListener taskAttemptListener,
-                                                  String workingDirectory,
-                                                  boolean isPureLocalMode) {
-      uberContainerLauncherCreated.set(true);
-      testContainerLaunchers.add(uberContainerlauncher);
-      return uberContainerlauncher;
-    }
-
-    @Override
-    ContainerLauncher createCustomContainerLauncher(
-        ContainerLauncherContext containerLauncherContext,
-        NamedEntityDescriptor containerLauncherDescriptor) {
-      ContainerLauncher spyLauncher = spy(super.createCustomContainerLauncher(
-          containerLauncherContext, containerLauncherDescriptor));
-      testContainerLaunchers.add(spyLauncher);
-      return spyLauncher;
-    }
-
-    public int getNumContainerLaunchers() {
-      return numContainerLaunchers.get();
-    }
-
-    public boolean getYarnContainerLauncherCreated() {
-      return yarnContainerLauncherCreated.get();
-    }
-
-    public boolean getUberContainerLauncherCreated() {
-      return uberContainerLauncherCreated.get();
-    }
-
-    public String getContainerLauncherName(int containerLauncherIndex) {
-      return containerLauncherNames.get(containerLauncherIndex);
-    }
-
-    public ContainerLauncher getTestContainerLauncher(int containerLauncherIndex) {
-      return testContainerLaunchers.get(containerLauncherIndex);
-    }
-
-    public ContainerLauncherContext getContainerLauncherContext(int containerLauncherIndex) {
-      return containerLauncherContexts.get(containerLauncherIndex);
-    }
-  }
-
-  private static class FakeContainerLauncher extends ContainerLauncher {
-
-    public FakeContainerLauncher(
-        ContainerLauncherContext containerLauncherContext) {
-      super(containerLauncherContext);
-    }
-
-    @Override
-    public void launchContainer(ContainerLaunchRequest launchRequest) {
-
-    }
-
-    @Override
-    public void stopContainer(ContainerStopRequest stopRequest) {
-
-    }
-  }
-
-}