You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:50 UTC
[3/7] tez git commit: TEZ-2708. Rename classes and variables post
TEZ-2003 changes. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 6ee741a..a8ba445 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -117,7 +117,7 @@ import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.MockClock;
import org.apache.tez.dag.app.TaskAttemptEventInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
@@ -153,7 +153,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation;
-import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.history.DAGHistoryEvent;
@@ -161,7 +161,6 @@ import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -228,7 +227,7 @@ public class TestVertexImpl {
private Map<String, VertexImpl> vertices;
private Map<TezVertexID, VertexImpl> vertexIdMap;
private DrainDispatcher dispatcher;
- private TaskAttemptListener taskAttemptListener;
+ private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
private Clock clock = new SystemClock();
private TaskHeartbeatHandler thh;
private AppContext appContext;
@@ -2077,16 +2076,16 @@ public class TestVertexImpl {
if (useCustomInitializer) {
if (customInitializer == null) {
v = new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), conf,
- dispatcher.getEventHandler(), taskAttemptListener,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
clock, thh, appContext, locationHint, dispatcher, updateTracker);
} else {
v = new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), conf,
- dispatcher.getEventHandler(), taskAttemptListener,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
clock, thh, appContext, locationHint, dispatcher, customInitializer, updateTracker);
}
} else {
v = new VertexImpl(vertexId, vPlan, vPlan.getName(), conf,
- dispatcher.getEventHandler(), taskAttemptListener,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
clock, thh, true, appContext, locationHint, vertexGroups, taskSpecificLaunchCmdOption,
updateTracker);
}
@@ -2162,7 +2161,7 @@ public class TestVertexImpl {
appContext = mock(AppContext.class);
thh = mock(TaskHeartbeatHandler.class);
historyEventHandler = mock(HistoryEventHandler.class);
- TaskSchedulerEventHandler taskScheduler = mock(TaskSchedulerEventHandler.class);
+ TaskSchedulerManager taskScheduler = mock(TaskSchedulerManager.class);
UserGroupInformation ugi;
try {
ugi = UserGroupInformation.getCurrentUser();
@@ -3227,7 +3226,7 @@ public class TestVertexImpl {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appContext);
containers.addContainerIfNew(container, 0, 0, 0);
doReturn(containers).when(appContext).getAllContainers();
@@ -3262,7 +3261,7 @@ public class TestVertexImpl {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appContext);
containers.addContainerIfNew(container, 0, 0, 0);
doReturn(containers).when(appContext).getAllContainers();
@@ -3298,7 +3297,7 @@ public class TestVertexImpl {
when(container.getNodeId()).thenReturn(nid);
when(container.getNodeHttpAddress()).thenReturn("localhost:0");
AMContainerMap containers = new AMContainerMap(
- mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appContext);
containers.addContainerIfNew(container, 0, 0, 0);
doReturn(containers).when(appContext).getAllContainers();
@@ -5179,7 +5178,7 @@ public class TestVertexImpl {
vId = TezVertexID.getInstance(invalidDagId, 1);
VertexPlan vPlan = invalidDagPlan.getVertex(0);
VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), conf,
- dispatcher.getEventHandler(), taskAttemptListener,
+ dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
updateTracker);
v.setInputVertices(new HashMap());
@@ -5208,7 +5207,7 @@ public class TestVertexImpl {
VertexPlan vertexPlan, String vertexName,
Configuration conf,
EventHandler eventHandler,
- TaskAttemptListener taskAttemptListener,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
Clock clock, TaskHeartbeatHandler thh,
AppContext appContext,
VertexLocationHint vertexLocationHint,
@@ -5216,7 +5215,7 @@ public class TestVertexImpl {
InputInitializer presetInitializer,
StateChangeNotifier updateTracker) {
super(vertexId, vertexPlan, vertexName, conf, eventHandler,
- taskAttemptListener, clock, thh, true,
+ taskCommunicatorManagerInterface, clock, thh, true,
appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
updateTracker);
this.presetInitializer = presetInitializer;
@@ -5248,14 +5247,14 @@ public class TestVertexImpl {
VertexPlan vertexPlan, String vertexName,
Configuration conf,
EventHandler eventHandler,
- TaskAttemptListener taskAttemptListener,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
Clock clock, TaskHeartbeatHandler thh,
AppContext appContext,
VertexLocationHint vertexLocationHint,
DrainDispatcher dispatcher,
StateChangeNotifier updateTracker) {
super(vertexId, vertexPlan, vertexName, conf, eventHandler,
- taskAttemptListener, clock, thh, true,
+ taskCommunicatorManagerInterface, clock, thh, true,
appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption,
updateTracker);
this.dispatcher = dispatcher;
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
index 0e34f68..8bd288a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl2.java
@@ -54,7 +54,7 @@ import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.DAGAppMaster;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
@@ -517,7 +517,7 @@ public class TestVertexImpl2 {
vertex =
new VertexImpl(TezVertexID.fromString("vertex_1418197758681_0001_1_00"), vertexPlan,
- "testvertex", conf, mock(EventHandler.class), mock(TaskAttemptListener.class),
+ "testvertex", conf, mock(EventHandler.class), mock(TaskCommunicatorManagerInterface.class),
mock(Clock.class), mock(TaskHeartbeatHandler.class), false, mockAppContext,
VertexLocationHint.create(new LinkedList<TaskLocationHint>()), null,
new TaskSpecificLaunchCmdOption(conf), mock(StateChangeNotifier.class));
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 0f532fb..e389d64 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -55,7 +55,7 @@ import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
@@ -434,7 +434,7 @@ public class TestVertexRecovery {
DAGPlan dagPlan = createDAGPlan();
dag =
new DAGImpl(dagId, new Configuration(), dagPlan,
- dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+ dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class),
new Credentials(), new SystemClock(), user,
mock(TaskHeartbeatHandler.class), mockAppContext);
when(mockAppContext.getCurrentDAG()).thenReturn(dag);
@@ -544,7 +544,7 @@ public class TestVertexRecovery {
DAGPlan dagPlan = createDAGPlanSingleVertex();
dag =
new DAGImpl(dagId, new Configuration(), dagPlan,
- dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+ dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class),
new Credentials(), new SystemClock(), user,
mock(TaskHeartbeatHandler.class), mockAppContext);
when(mockAppContext.getCurrentDAG()).thenReturn(dag);
@@ -924,7 +924,7 @@ public class TestVertexRecovery {
DAGPlan dagPlan = createDAGPlanMR();
dag =
new DAGImpl(dagId, new Configuration(), dagPlan,
- dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+ dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class),
new Credentials(), new SystemClock(), user,
mock(TaskHeartbeatHandler.class), mockAppContext);
when(mockAppContext.getCurrentDAG()).thenReturn(dag);
@@ -965,7 +965,7 @@ public class TestVertexRecovery {
DAGPlan dagPlan = createDAGPlan();
dag =
new DAGImpl(dagId, new Configuration(), dagPlan,
- dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+ dispatcher.getEventHandler(), mock(TaskCommunicatorManagerInterface.class),
new Credentials(), new SystemClock(), user,
mock(TaskHeartbeatHandler.class), mockAppContext);
when(mockAppContext.getCurrentDAG()).thenReturn(dag);
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
new file mode 100644
index 0000000..4b931d4
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherManager.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.launcher;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
+import org.apache.tez.dag.app.rm.ContainerLauncherLaunchRequestEvent;
+import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
+import org.apache.tez.serviceplugins.api.ContainerLauncher;
+import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
+import org.apache.tez.serviceplugins.api.ContainerStopRequest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestContainerLauncherManager {
+
+ @Before
+ @After
+ public void reset() {
+ ContainerLaucherRouterForMultipleLauncherTest.reset();
+ }
+
+ @Test(timeout = 5000)
+ public void testNoLaunchersSpecified() throws IOException {
+
+ AppContext appContext = mock(AppContext.class);
+ TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
+
+ try {
+
+ new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, null,
+ false);
+ fail("Expecting a failure without any launchers being specified");
+ } catch (IllegalArgumentException e) {
+
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testCustomLauncherSpecified() throws IOException {
+ Configuration conf = new Configuration(false);
+
+ AppContext appContext = mock(AppContext.class);
+ TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
+
+ String customLauncherName = "customLauncher";
+ List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
+ ByteBuffer bb = ByteBuffer.allocate(4);
+ bb.putInt(0, 3);
+ UserPayload customPayload = UserPayload.create(bb);
+ launcherDescriptors.add(
+ new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
+ .setUserPayload(customPayload));
+
+ ContainerLaucherRouterForMultipleLauncherTest clr =
+ new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
+ launcherDescriptors,
+ true);
+ try {
+ clr.init(conf);
+ clr.start();
+
+ assertEquals(1, clr.getNumContainerLaunchers());
+ assertFalse(clr.getYarnContainerLauncherCreated());
+ assertFalse(clr.getUberContainerLauncherCreated());
+ assertEquals(customLauncherName, clr.getContainerLauncherName(0));
+ assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload());
+ } finally {
+ clr.stop();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testMultipleContainerLaunchers() throws IOException {
+ Configuration conf = new Configuration(false);
+ conf.set("testkey", "testvalue");
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+
+ AppContext appContext = mock(AppContext.class);
+ TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
+
+ String customLauncherName = "customLauncher";
+ List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
+ ByteBuffer bb = ByteBuffer.allocate(4);
+ bb.putInt(0, 3);
+ UserPayload customPayload = UserPayload.create(bb);
+ launcherDescriptors.add(
+ new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
+ .setUserPayload(customPayload));
+ launcherDescriptors
+ .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+ .setUserPayload(userPayload));
+
+ ContainerLaucherRouterForMultipleLauncherTest clr =
+ new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
+ launcherDescriptors,
+ true);
+ try {
+ clr.init(conf);
+ clr.start();
+
+ assertEquals(2, clr.getNumContainerLaunchers());
+ assertTrue(clr.getYarnContainerLauncherCreated());
+ assertFalse(clr.getUberContainerLauncherCreated());
+ assertEquals(customLauncherName, clr.getContainerLauncherName(0));
+ assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload());
+
+ assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1));
+ Configuration confParsed = TezUtils
+ .createConfFromUserPayload(clr.getContainerLauncherContext(1).getInitialUserPayload());
+ assertEquals("testvalue", confParsed.get("testkey"));
+ } finally {
+ clr.stop();
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testEventRouting() throws Exception {
+ Configuration conf = new Configuration(false);
+ UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
+
+ AppContext appContext = mock(AppContext.class);
+ TaskCommunicatorManagerInterface tal = mock(TaskCommunicatorManagerInterface.class);
+
+ String customLauncherName = "customLauncher";
+ List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
+ ByteBuffer bb = ByteBuffer.allocate(4);
+ bb.putInt(0, 3);
+ UserPayload customPayload = UserPayload.create(bb);
+ launcherDescriptors.add(
+ new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
+ .setUserPayload(customPayload));
+ launcherDescriptors
+ .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
+ .setUserPayload(userPayload));
+
+ ContainerLaucherRouterForMultipleLauncherTest clr =
+ new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
+ launcherDescriptors,
+ true);
+ try {
+ clr.init(conf);
+ clr.start();
+
+ assertEquals(2, clr.getNumContainerLaunchers());
+ assertTrue(clr.getYarnContainerLauncherCreated());
+ assertFalse(clr.getUberContainerLauncherCreated());
+ assertEquals(customLauncherName, clr.getContainerLauncherName(0));
+ assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1));
+
+ verify(clr.getTestContainerLauncher(0)).initialize();
+ verify(clr.getTestContainerLauncher(0)).start();
+ verify(clr.getTestContainerLauncher(1)).initialize();
+ verify(clr.getTestContainerLauncher(1)).start();
+
+ ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class);
+ Container container1 = mock(Container.class);
+
+ ContainerLaunchContext clc2 = mock(ContainerLaunchContext.class);
+ Container container2 = mock(Container.class);
+
+ ContainerLauncherLaunchRequestEvent launchRequestEvent1 =
+ new ContainerLauncherLaunchRequestEvent(clc1, container1, 0, 0, 0);
+ ContainerLauncherLaunchRequestEvent launchRequestEvent2 =
+ new ContainerLauncherLaunchRequestEvent(clc2, container2, 1, 0, 0);
+
+ clr.handle(launchRequestEvent1);
+
+
+ ArgumentCaptor<ContainerLaunchRequest> captor =
+ ArgumentCaptor.forClass(ContainerLaunchRequest.class);
+ verify(clr.getTestContainerLauncher(0)).launchContainer(captor.capture());
+ assertEquals(1, captor.getAllValues().size());
+ ContainerLaunchRequest launchRequest1 = captor.getValue();
+ assertEquals(clc1, launchRequest1.getContainerLaunchContext());
+
+ clr.handle(launchRequestEvent2);
+ captor = ArgumentCaptor.forClass(ContainerLaunchRequest.class);
+ verify(clr.getTestContainerLauncher(1)).launchContainer(captor.capture());
+ assertEquals(1, captor.getAllValues().size());
+ ContainerLaunchRequest launchRequest2 = captor.getValue();
+ assertEquals(clc2, launchRequest2.getContainerLaunchContext());
+
+ } finally {
+ clr.stop();
+ verify(clr.getTestContainerLauncher(0)).shutdown();
+ verify(clr.getTestContainerLauncher(1)).shutdown();
+ }
+ }
+
+ private static class ContainerLaucherRouterForMultipleLauncherTest
+ extends ContainerLauncherManager {
+
+ // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor,
+ // and regular variables will not be initialized at this point.
+ private static final AtomicInteger numContainerLaunchers = new AtomicInteger(0);
+ private static final Set<Integer> containerLauncherIndices = new HashSet<>();
+ private static final ContainerLauncher yarnContainerLauncher = mock(ContainerLauncher.class);
+ private static final ContainerLauncher uberContainerlauncher = mock(ContainerLauncher.class);
+ private static final AtomicBoolean yarnContainerLauncherCreated = new AtomicBoolean(false);
+ private static final AtomicBoolean uberContainerLauncherCreated = new AtomicBoolean(false);
+
+ private static final List<ContainerLauncherContext> containerLauncherContexts =
+ new LinkedList<>();
+ private static final List<String> containerLauncherNames = new LinkedList<>();
+ private static final List<ContainerLauncher> testContainerLaunchers = new LinkedList<>();
+
+
+ public static void reset() {
+ numContainerLaunchers.set(0);
+ containerLauncherIndices.clear();
+ yarnContainerLauncherCreated.set(false);
+ uberContainerLauncherCreated.set(false);
+ containerLauncherContexts.clear();
+ containerLauncherNames.clear();
+ testContainerLaunchers.clear();
+ }
+
+ public ContainerLaucherRouterForMultipleLauncherTest(AppContext context,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+ String workingDirectory,
+ List<NamedEntityDescriptor> containerLauncherDescriptors,
+ boolean isPureLocalMode) throws
+ UnknownHostException {
+ super(context, taskCommunicatorManagerInterface, workingDirectory,
+ containerLauncherDescriptors, isPureLocalMode);
+ }
+
+ @Override
+ ContainerLauncher createContainerLauncher(NamedEntityDescriptor containerLauncherDescriptor,
+ AppContext context,
+ ContainerLauncherContext containerLauncherContext,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+ String workingDirectory,
+ int containerLauncherIndex,
+ boolean isPureLocalMode) {
+ numContainerLaunchers.incrementAndGet();
+ boolean added = containerLauncherIndices.add(containerLauncherIndex);
+ assertTrue("Cannot add multiple launchers with the same index", added);
+ containerLauncherNames.add(containerLauncherDescriptor.getEntityName());
+ containerLauncherContexts.add(containerLauncherContext);
+ return super
+ .createContainerLauncher(containerLauncherDescriptor, context, containerLauncherContext,
+ taskCommunicatorManagerInterface, workingDirectory, containerLauncherIndex, isPureLocalMode);
+ }
+
+ @Override
+ ContainerLauncher createYarnContainerLauncher(
+ ContainerLauncherContext containerLauncherContext) {
+ yarnContainerLauncherCreated.set(true);
+ testContainerLaunchers.add(yarnContainerLauncher);
+ return yarnContainerLauncher;
+ }
+
+ @Override
+ ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext,
+ AppContext context,
+ TaskCommunicatorManagerInterface taskCommunicatorManagerInterface,
+ String workingDirectory,
+ boolean isPureLocalMode) {
+ uberContainerLauncherCreated.set(true);
+ testContainerLaunchers.add(uberContainerlauncher);
+ return uberContainerlauncher;
+ }
+
+ @Override
+ ContainerLauncher createCustomContainerLauncher(
+ ContainerLauncherContext containerLauncherContext,
+ NamedEntityDescriptor containerLauncherDescriptor) {
+ ContainerLauncher spyLauncher = spy(super.createCustomContainerLauncher(
+ containerLauncherContext, containerLauncherDescriptor));
+ testContainerLaunchers.add(spyLauncher);
+ return spyLauncher;
+ }
+
+ public int getNumContainerLaunchers() {
+ return numContainerLaunchers.get();
+ }
+
+ public boolean getYarnContainerLauncherCreated() {
+ return yarnContainerLauncherCreated.get();
+ }
+
+ public boolean getUberContainerLauncherCreated() {
+ return uberContainerLauncherCreated.get();
+ }
+
+ public String getContainerLauncherName(int containerLauncherIndex) {
+ return containerLauncherNames.get(containerLauncherIndex);
+ }
+
+ public ContainerLauncher getTestContainerLauncher(int containerLauncherIndex) {
+ return testContainerLaunchers.get(containerLauncherIndex);
+ }
+
+ public ContainerLauncherContext getContainerLauncherContext(int containerLauncherIndex) {
+ return containerLauncherContexts.get(containerLauncherIndex);
+ }
+ }
+
+ private static class FakeContainerLauncher extends ContainerLauncher {
+
+ public FakeContainerLauncher(
+ ContainerLauncherContext containerLauncherContext) {
+ super(containerLauncherContext);
+ }
+
+ @Override
+ public void launchContainer(ContainerLaunchRequest launchRequest) {
+
+ }
+
+ @Override
+ public void stopContainer(ContainerStopRequest stopRequest) {
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java b/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
deleted file mode 100644
index d0caf8c..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/launcher/TestContainerLauncherRouter.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.launcher;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.TaskAttemptListener;
-import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
-import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
-import org.apache.tez.serviceplugins.api.ContainerLauncher;
-import org.apache.tez.serviceplugins.api.ContainerLauncherContext;
-import org.apache.tez.serviceplugins.api.ContainerStopRequest;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-
-public class TestContainerLauncherRouter {
-
- @Before
- @After
- public void reset() {
- ContainerLaucherRouterForMultipleLauncherTest.reset();
- }
-
- @Test(timeout = 5000)
- public void testNoLaunchersSpecified() throws IOException {
-
- AppContext appContext = mock(AppContext.class);
- TaskAttemptListener tal = mock(TaskAttemptListener.class);
-
- try {
-
- new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null, null,
- false);
- fail("Expecting a failure without any launchers being specified");
- } catch (IllegalArgumentException e) {
-
- }
- }
-
- @Test(timeout = 5000)
- public void testCustomLauncherSpecified() throws IOException {
- Configuration conf = new Configuration(false);
-
- AppContext appContext = mock(AppContext.class);
- TaskAttemptListener tal = mock(TaskAttemptListener.class);
-
- String customLauncherName = "customLauncher";
- List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
- ByteBuffer bb = ByteBuffer.allocate(4);
- bb.putInt(0, 3);
- UserPayload customPayload = UserPayload.create(bb);
- launcherDescriptors.add(
- new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
- .setUserPayload(customPayload));
-
- ContainerLaucherRouterForMultipleLauncherTest clr =
- new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
- launcherDescriptors,
- true);
- try {
- clr.init(conf);
- clr.start();
-
- assertEquals(1, clr.getNumContainerLaunchers());
- assertFalse(clr.getYarnContainerLauncherCreated());
- assertFalse(clr.getUberContainerLauncherCreated());
- assertEquals(customLauncherName, clr.getContainerLauncherName(0));
- assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload());
- } finally {
- clr.stop();
- }
- }
-
- @Test(timeout = 5000)
- public void testMultipleContainerLaunchers() throws IOException {
- Configuration conf = new Configuration(false);
- conf.set("testkey", "testvalue");
- UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-
- AppContext appContext = mock(AppContext.class);
- TaskAttemptListener tal = mock(TaskAttemptListener.class);
-
- String customLauncherName = "customLauncher";
- List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
- ByteBuffer bb = ByteBuffer.allocate(4);
- bb.putInt(0, 3);
- UserPayload customPayload = UserPayload.create(bb);
- launcherDescriptors.add(
- new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
- .setUserPayload(customPayload));
- launcherDescriptors
- .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
- .setUserPayload(userPayload));
-
- ContainerLaucherRouterForMultipleLauncherTest clr =
- new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
- launcherDescriptors,
- true);
- try {
- clr.init(conf);
- clr.start();
-
- assertEquals(2, clr.getNumContainerLaunchers());
- assertTrue(clr.getYarnContainerLauncherCreated());
- assertFalse(clr.getUberContainerLauncherCreated());
- assertEquals(customLauncherName, clr.getContainerLauncherName(0));
- assertEquals(bb, clr.getContainerLauncherContext(0).getInitialUserPayload().getPayload());
-
- assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1));
- Configuration confParsed = TezUtils
- .createConfFromUserPayload(clr.getContainerLauncherContext(1).getInitialUserPayload());
- assertEquals("testvalue", confParsed.get("testkey"));
- } finally {
- clr.stop();
- }
- }
-
- @Test(timeout = 5000)
- public void testEventRouting() throws Exception {
- Configuration conf = new Configuration(false);
- UserPayload userPayload = TezUtils.createUserPayloadFromConf(conf);
-
- AppContext appContext = mock(AppContext.class);
- TaskAttemptListener tal = mock(TaskAttemptListener.class);
-
- String customLauncherName = "customLauncher";
- List<NamedEntityDescriptor> launcherDescriptors = new LinkedList<>();
- ByteBuffer bb = ByteBuffer.allocate(4);
- bb.putInt(0, 3);
- UserPayload customPayload = UserPayload.create(bb);
- launcherDescriptors.add(
- new NamedEntityDescriptor(customLauncherName, FakeContainerLauncher.class.getName())
- .setUserPayload(customPayload));
- launcherDescriptors
- .add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
- .setUserPayload(userPayload));
-
- ContainerLaucherRouterForMultipleLauncherTest clr =
- new ContainerLaucherRouterForMultipleLauncherTest(appContext, tal, null,
- launcherDescriptors,
- true);
- try {
- clr.init(conf);
- clr.start();
-
- assertEquals(2, clr.getNumContainerLaunchers());
- assertTrue(clr.getYarnContainerLauncherCreated());
- assertFalse(clr.getUberContainerLauncherCreated());
- assertEquals(customLauncherName, clr.getContainerLauncherName(0));
- assertEquals(TezConstants.getTezYarnServicePluginName(), clr.getContainerLauncherName(1));
-
- verify(clr.getTestContainerLauncher(0)).initialize();
- verify(clr.getTestContainerLauncher(0)).start();
- verify(clr.getTestContainerLauncher(1)).initialize();
- verify(clr.getTestContainerLauncher(1)).start();
-
- ContainerLaunchContext clc1 = mock(ContainerLaunchContext.class);
- Container container1 = mock(Container.class);
-
- ContainerLaunchContext clc2 = mock(ContainerLaunchContext.class);
- Container container2 = mock(Container.class);
-
- NMCommunicatorLaunchRequestEvent launchRequestEvent1 =
- new NMCommunicatorLaunchRequestEvent(clc1, container1, 0, 0, 0);
- NMCommunicatorLaunchRequestEvent launchRequestEvent2 =
- new NMCommunicatorLaunchRequestEvent(clc2, container2, 1, 0, 0);
-
- clr.handle(launchRequestEvent1);
-
-
- ArgumentCaptor<ContainerLaunchRequest> captor =
- ArgumentCaptor.forClass(ContainerLaunchRequest.class);
- verify(clr.getTestContainerLauncher(0)).launchContainer(captor.capture());
- assertEquals(1, captor.getAllValues().size());
- ContainerLaunchRequest launchRequest1 = captor.getValue();
- assertEquals(clc1, launchRequest1.getContainerLaunchContext());
-
- clr.handle(launchRequestEvent2);
- captor = ArgumentCaptor.forClass(ContainerLaunchRequest.class);
- verify(clr.getTestContainerLauncher(1)).launchContainer(captor.capture());
- assertEquals(1, captor.getAllValues().size());
- ContainerLaunchRequest launchRequest2 = captor.getValue();
- assertEquals(clc2, launchRequest2.getContainerLaunchContext());
-
- } finally {
- clr.stop();
- verify(clr.getTestContainerLauncher(0)).shutdown();
- verify(clr.getTestContainerLauncher(1)).shutdown();
- }
- }
-
- private static class ContainerLaucherRouterForMultipleLauncherTest
- extends ContainerLauncherRouter {
-
- // All variables setup as static since methods being overridden are invoked by the ContainerLauncherRouter ctor,
- // and regular variables will not be initialized at this point.
- private static final AtomicInteger numContainerLaunchers = new AtomicInteger(0);
- private static final Set<Integer> containerLauncherIndices = new HashSet<>();
- private static final ContainerLauncher yarnContainerLauncher = mock(ContainerLauncher.class);
- private static final ContainerLauncher uberContainerlauncher = mock(ContainerLauncher.class);
- private static final AtomicBoolean yarnContainerLauncherCreated = new AtomicBoolean(false);
- private static final AtomicBoolean uberContainerLauncherCreated = new AtomicBoolean(false);
-
- private static final List<ContainerLauncherContext> containerLauncherContexts =
- new LinkedList<>();
- private static final List<String> containerLauncherNames = new LinkedList<>();
- private static final List<ContainerLauncher> testContainerLaunchers = new LinkedList<>();
-
-
- public static void reset() {
- numContainerLaunchers.set(0);
- containerLauncherIndices.clear();
- yarnContainerLauncherCreated.set(false);
- uberContainerLauncherCreated.set(false);
- containerLauncherContexts.clear();
- containerLauncherNames.clear();
- testContainerLaunchers.clear();
- }
-
- public ContainerLaucherRouterForMultipleLauncherTest(AppContext context,
- TaskAttemptListener taskAttemptListener,
- String workingDirectory,
- List<NamedEntityDescriptor> containerLauncherDescriptors,
- boolean isPureLocalMode) throws
- UnknownHostException {
- super(context, taskAttemptListener, workingDirectory,
- containerLauncherDescriptors, isPureLocalMode);
- }
-
- @Override
- ContainerLauncher createContainerLauncher(NamedEntityDescriptor containerLauncherDescriptor,
- AppContext context,
- ContainerLauncherContext containerLauncherContext,
- TaskAttemptListener taskAttemptListener,
- String workingDirectory,
- int containerLauncherIndex,
- boolean isPureLocalMode) {
- numContainerLaunchers.incrementAndGet();
- boolean added = containerLauncherIndices.add(containerLauncherIndex);
- assertTrue("Cannot add multiple launchers with the same index", added);
- containerLauncherNames.add(containerLauncherDescriptor.getEntityName());
- containerLauncherContexts.add(containerLauncherContext);
- return super
- .createContainerLauncher(containerLauncherDescriptor, context, containerLauncherContext,
- taskAttemptListener, workingDirectory, containerLauncherIndex, isPureLocalMode);
- }
-
- @Override
- ContainerLauncher createYarnContainerLauncher(
- ContainerLauncherContext containerLauncherContext) {
- yarnContainerLauncherCreated.set(true);
- testContainerLaunchers.add(yarnContainerLauncher);
- return yarnContainerLauncher;
- }
-
- @Override
- ContainerLauncher createUberContainerLauncher(ContainerLauncherContext containerLauncherContext,
- AppContext context,
- TaskAttemptListener taskAttemptListener,
- String workingDirectory,
- boolean isPureLocalMode) {
- uberContainerLauncherCreated.set(true);
- testContainerLaunchers.add(uberContainerlauncher);
- return uberContainerlauncher;
- }
-
- @Override
- ContainerLauncher createCustomContainerLauncher(
- ContainerLauncherContext containerLauncherContext,
- NamedEntityDescriptor containerLauncherDescriptor) {
- ContainerLauncher spyLauncher = spy(super.createCustomContainerLauncher(
- containerLauncherContext, containerLauncherDescriptor));
- testContainerLaunchers.add(spyLauncher);
- return spyLauncher;
- }
-
- public int getNumContainerLaunchers() {
- return numContainerLaunchers.get();
- }
-
- public boolean getYarnContainerLauncherCreated() {
- return yarnContainerLauncherCreated.get();
- }
-
- public boolean getUberContainerLauncherCreated() {
- return uberContainerLauncherCreated.get();
- }
-
- public String getContainerLauncherName(int containerLauncherIndex) {
- return containerLauncherNames.get(containerLauncherIndex);
- }
-
- public ContainerLauncher getTestContainerLauncher(int containerLauncherIndex) {
- return testContainerLaunchers.get(containerLauncherIndex);
- }
-
- public ContainerLauncherContext getContainerLauncherContext(int containerLauncherIndex) {
- return containerLauncherContexts.get(containerLauncherIndex);
- }
- }
-
- private static class FakeContainerLauncher extends ContainerLauncher {
-
- public FakeContainerLauncher(
- ContainerLauncherContext containerLauncherContext) {
- super(containerLauncherContext);
- }
-
- @Override
- public void launchContainer(ContainerLaunchRequest launchRequest) {
-
- }
-
- @Override
- public void stopContainer(ContainerStopRequest stopRequest) {
-
- }
- }
-
-}