You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/26 01:48:49 UTC
[2/7] tez git commit: TEZ-2708. Rename classes and variables post
TEZ-2003 changes. (sseth)
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index 0a02f9e..0e90681 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -68,7 +68,7 @@ import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.DAGAppMasterState;
-import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
@@ -76,7 +76,7 @@ import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.CapturingEventHandler;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerEventHandlerForTest;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerManagerForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
@@ -136,7 +136,7 @@ public class TestContainerReuse {
doReturn(conf).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class),
- mock(TaskAttemptListener.class),
+ mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
@@ -145,18 +145,18 @@ public class TestContainerReuse {
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
- new TaskSchedulerEventHandlerForTest(
+ TaskSchedulerManager taskSchedulerManagerReal =
+ new TaskSchedulerManagerForTest(
appContext, eventHandler, rmClient,
new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf));
- TaskSchedulerEventHandler taskSchedulerEventHandler =
- spy(taskSchedulerEventHandlerReal);
- taskSchedulerEventHandler.init(conf);
- taskSchedulerEventHandler.start();
+ TaskSchedulerManager taskSchedulerManager =
+ spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(conf);
+ taskSchedulerManager.start();
TaskSchedulerWithDrainableContext taskScheduler =
(TaskSchedulerWithDrainableContext)
- ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ ((TaskSchedulerManagerForTest) taskSchedulerManager)
.getSpyTaskScheduler();
TaskSchedulerContextDrainable drainableAppCallback =
taskScheduler.getDrainableAppCallback();
@@ -192,10 +192,10 @@ public class TestContainerReuse {
defaultRack, priority);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrTa11);
+ taskSchedulerManager.handleEvent(lrTa11);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrTa21);
+ taskSchedulerManager.handleEvent(lrTa21);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container containerHost1 = createContainer(1, host1[0], resource, priority);
@@ -206,28 +206,28 @@ public class TestContainerReuse {
Lists.newArrayList(containerHost1, containerHost2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(
+ verify(taskSchedulerManager).taskAllocated(
eq(0), eq(ta11), any(Object.class), eq(containerHost1));
- verify(taskSchedulerEventHandler).taskAllocated(
+ verify(taskSchedulerManager).taskAllocated(
eq(0), eq(ta21), any(Object.class), eq(containerHost2));
// Adding the event later so that task1 assigned to containerHost1
// is deterministic.
- taskSchedulerEventHandler.handleEvent(lrTa31);
+ taskSchedulerManager.handleEvent(lrTa31);
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(
ta11, containerHost1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
- verify(taskSchedulerEventHandler, times(1)).taskAllocated(
+ verify(taskSchedulerManager, times(1)).taskAllocated(
eq(0), eq(ta31), any(Object.class), eq(containerHost1));
verify(rmClient, times(0)).releaseAssignedContainer(
eq(containerHost1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
TaskAttemptState.SUCCEEDED, null, null, 0));
@@ -235,7 +235,7 @@ public class TestContainerReuse {
Throwable exception = null;
while (System.currentTimeMillis() < currentTs + 5000l) {
try {
- verify(taskSchedulerEventHandler,
+ verify(taskSchedulerManager,
times(1)).containerBeingReleased(eq(0), eq(containerHost2.getId()));
exception = null;
break;
@@ -245,7 +245,7 @@ public class TestContainerReuse {
}
assertTrue("containerHost2 was not released", exception == null);
taskScheduler.shutdown();
- taskSchedulerEventHandler.close();
+ taskSchedulerManager.close();
}
@Test(timeout = 15000l)
@@ -272,7 +272,7 @@ public class TestContainerReuse {
doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class),
- mock(TaskAttemptListener.class),
+ mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
@@ -281,17 +281,17 @@ public class TestContainerReuse {
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
- new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+ TaskSchedulerManager taskSchedulerManagerReal =
+ new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(conf));
- TaskSchedulerEventHandler taskSchedulerEventHandler =
- spy(taskSchedulerEventHandlerReal);
- taskSchedulerEventHandler.init(conf);
- taskSchedulerEventHandler.start();
+ TaskSchedulerManager taskSchedulerManager =
+ spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(conf);
+ taskSchedulerManager.start();
TaskSchedulerWithDrainableContext taskScheduler =
(TaskSchedulerWithDrainableContext)
- ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ ((TaskSchedulerManagerForTest) taskSchedulerManager)
.getSpyTaskScheduler();
TaskSchedulerContextDrainable drainableAppCallback =
taskScheduler.getDrainableAppCallback();
@@ -321,10 +321,10 @@ public class TestContainerReuse {
taID31, ta31, resource, host1, defaultRack, priority);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrTa11);
+ taskSchedulerManager.handleEvent(lrTa11);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrTa21);
+ taskSchedulerManager.handleEvent(lrTa21);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container containerHost1 = createContainer(1, host1[0], resource, priority);
@@ -334,26 +334,26 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Lists.newArrayList(containerHost1, containerHost2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1));
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta21), any(Object.class),
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(containerHost1));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta21), any(Object.class),
eq(containerHost2));
// Adding the event later so that task1 assigned to containerHost1 is deterministic.
- taskSchedulerEventHandler.handleEvent(lrTa31);
+ taskSchedulerManager.handleEvent(lrTa31);
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta21, containerHost2.getId(),
TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta21, true, null, null);
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(
+ verify(taskSchedulerManager, times(0)).taskAllocated(
eq(0), eq(ta31), any(Object.class), eq(containerHost2));
verify(rmClient, times(1)).releaseAssignedContainer(
eq(containerHost2.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
taskScheduler.shutdown();
- taskSchedulerEventHandler.close();
+ taskSchedulerManager.close();
}
@Test(timeout = 10000l)
@@ -375,7 +375,7 @@ public class TestContainerReuse {
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
- mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+ mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -383,12 +383,13 @@ public class TestContainerReuse {
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
- TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
- taskSchedulerEventHandler.init(tezConf);
- taskSchedulerEventHandler.start();
+ TaskSchedulerManager
+ taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
+ TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
- TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager)
.getSpyTaskScheduler();
TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -423,10 +424,10 @@ public class TestContainerReuse {
TaskAttempt ta14 = mock(TaskAttempt.class);
AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(taID14, ta14, resource1, host2, racks, priority1);
- taskSchedulerEventHandler.handleEvent(lrEvent1);
- taskSchedulerEventHandler.handleEvent(lrEvent2);
- taskSchedulerEventHandler.handleEvent(lrEvent3);
- taskSchedulerEventHandler.handleEvent(lrEvent4);
+ taskSchedulerManager.handleEvent(lrEvent1);
+ taskSchedulerManager.handleEvent(lrEvent2);
+ taskSchedulerManager.handleEvent(lrEvent3);
+ taskSchedulerManager.handleEvent(lrEvent4);
Container container1 = createContainer(1, "host1", resource1, priority1);
@@ -435,39 +436,39 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class),
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class),
eq(container1));
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta12, container1.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta12, true, null, null);
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class),
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class),
eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
eventHandler.reset();
// Verify no re-use if a previous task fails.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta13, container1.getId(), TaskAttemptState.FAILED, null,
"TIMEOUT", 0));
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
+ verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
eq(container1));
verifyDeAllocateTask(taskScheduler, ta13, false, null, "TIMEOUT");
verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
@@ -481,11 +482,11 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta14), any(Object.class),
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class),
eq(container2));
// Task assigned to container completed successfully. No pending requests. Container should be released.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta14, container2.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
@@ -495,7 +496,7 @@ public class TestContainerReuse {
eventHandler.reset();
taskScheduler.shutdown();
- taskSchedulerEventHandler.close();
+ taskSchedulerManager.close();
}
@Test(timeout = 10000l)
@@ -521,7 +522,7 @@ public class TestContainerReuse {
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
- mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+ mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -530,14 +531,14 @@ public class TestContainerReuse {
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
//Use ContainerContextMatcher here. Otherwise it would not match the JVM options
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
- new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
- TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
- taskSchedulerEventHandler.init(tezConf);
- taskSchedulerEventHandler.start();
+ TaskSchedulerManager taskSchedulerManagerReal =
+ new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
+ TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
TaskSchedulerWithDrainableContext taskScheduler =
- (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager)
.getSpyTaskScheduler();
TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -573,10 +574,10 @@ public class TestContainerReuse {
createLaunchRequestEvent(taID12, ta12, resource1, host1, racks, priority1);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent1);
+ taskSchedulerManager.handleEvent(lrEvent1);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent2);
+ taskSchedulerManager.handleEvent(lrEvent2);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, "host1", resource1, priority1);
@@ -586,16 +587,16 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta11), any(Object.class),
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class),
eq(container1));
// First task had profiling on. This container can not be reused further.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class),
+ verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta12), any(Object.class),
eq(container1));
verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -623,10 +624,10 @@ public class TestContainerReuse {
Container container2 = createContainer(2, "host2", resource1, priority1);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent3);
+ taskSchedulerManager.handleEvent(lrEvent3);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent4);
+ taskSchedulerManager.handleEvent(lrEvent4);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
// Container started
@@ -634,16 +635,16 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container2));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2));
// Verify that the container can not be reused when profiling option is turned on
// Even for 2 tasks having same profiling option can have container reusability.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta13, container2.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta13, true, null, null);
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
+ verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta14), any(Object.class),
eq(container2));
verify(rmClient, times(1)).releaseAssignedContainer(eq(container2.getId()));
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
@@ -675,27 +676,27 @@ public class TestContainerReuse {
// Container started
Container container3 = createContainer(2, "host3", resource1, priority1);
- taskSchedulerEventHandler.handleEvent(lrEvent5);
- taskSchedulerEventHandler.handleEvent(lrEvent6);
+ taskSchedulerManager.handleEvent(lrEvent5);
+ taskSchedulerManager.handleEvent(lrEvent6);
drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container3));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta15), any(Object.class),
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta15), any(Object.class),
eq(container3));
//Ensure task 6 (of vertex 1) is allocated to same container
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta15, container3.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta15, true, null, null);
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta16), any(Object.class), eq(container3));
eventHandler.reset();
taskScheduler.shutdown();
- taskSchedulerEventHandler.close();
+ taskSchedulerManager.close();
}
@Test(timeout = 30000l)
@@ -721,7 +722,7 @@ public class TestContainerReuse {
doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class),
- mock(TaskAttemptListener.class),
+ mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
@@ -730,18 +731,18 @@ public class TestContainerReuse {
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
- new TaskSchedulerEventHandlerForTest(
+ TaskSchedulerManager taskSchedulerManagerReal =
+ new TaskSchedulerManagerForTest(
appContext, eventHandler, rmClient,
new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
- TaskSchedulerEventHandler taskSchedulerEventHandler =
- spy(taskSchedulerEventHandlerReal);
- taskSchedulerEventHandler.init(tezConf);
- taskSchedulerEventHandler.start();
+ TaskSchedulerManager taskSchedulerManager =
+ spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
TaskSchedulerWithDrainableContext taskScheduler =
(TaskSchedulerWithDrainableContext)
- ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ ((TaskSchedulerManagerForTest) taskSchedulerManager)
.getSpyTaskScheduler();
TaskSchedulerContextDrainable drainableAppCallback =
taskScheduler.getDrainableAppCallback();
@@ -772,7 +773,7 @@ public class TestContainerReuse {
// Send launch request for task 1 only, deterministic assignment to this task.
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent11);
+ taskSchedulerManager.handleEvent(lrEvent11);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, "randomHost", resource1, priority);
@@ -782,21 +783,21 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(
+ verify(taskSchedulerManager).taskAllocated(
eq(0), eq(ta11), any(Object.class), eq(container1));
// Send launch request for task2 (vertex2)
- taskSchedulerEventHandler.handleEvent(lrEvent12);
+ taskSchedulerManager.handleEvent(lrEvent12);
// Task assigned to container completed successfully.
// Container should not be immediately assigned to task 2
// until delay expires.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(),
TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(
+ verify(taskSchedulerManager, times(0)).taskAllocated(
eq(0), eq(ta12), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -804,11 +805,11 @@ public class TestContainerReuse {
LOG.info("Sleeping to ensure that the scheduling loop runs");
Thread.sleep(3000l);
- verify(taskSchedulerEventHandler).taskAllocated(
+ verify(taskSchedulerManager).taskAllocated(
eq(0), eq(ta12), any(Object.class), eq(container1));
// TA12 completed.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta12, container1.getId(),
TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
@@ -818,7 +819,7 @@ public class TestContainerReuse {
eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
taskScheduler.shutdown();
- taskSchedulerEventHandler.close();
+ taskSchedulerManager.close();
}
@Test(timeout = 30000l)
@@ -845,7 +846,7 @@ public class TestContainerReuse {
doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(
mock(ContainerHeartbeatHandler.class),
- mock(TaskAttemptListener.class),
+ mock(TaskCommunicatorManagerInterface.class),
new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
@@ -855,17 +856,17 @@ public class TestContainerReuse {
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
- new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+ TaskSchedulerManager taskSchedulerManagerReal =
+ new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
- TaskSchedulerEventHandler taskSchedulerEventHandler =
- spy(taskSchedulerEventHandlerReal);
- taskSchedulerEventHandler.init(tezConf);
- taskSchedulerEventHandler.start();
+ TaskSchedulerManager taskSchedulerManager =
+ spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
TaskSchedulerWithDrainableContext taskScheduler =
(TaskSchedulerWithDrainableContext)
- ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ ((TaskSchedulerManagerForTest) taskSchedulerManager)
.getSpyTaskScheduler();
TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
@@ -900,7 +901,7 @@ public class TestContainerReuse {
// Send launch request for task 1 onle, deterministic assignment to this task.
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent11);
+ taskSchedulerManager.handleEvent(lrEvent11);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, host1[0], resource1, priority1);
@@ -910,25 +911,25 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(
+ verify(taskSchedulerManager).taskAllocated(
eq(0), eq(ta11), any(Object.class), eq(container1));
// Send launch request for task2 (vertex2)
- taskSchedulerEventHandler.handleEvent(lrEvent21);
+ taskSchedulerManager.handleEvent(lrEvent21);
// Task assigned to container completed successfully.
// Container should be assigned to task21.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta11, container1.getId(),
TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
- verify(taskSchedulerEventHandler).taskAllocated(
+ verify(taskSchedulerManager).taskAllocated(
eq(0), eq(ta21), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
// Task 2 completes.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta21, container1.getId(),
TaskAttemptState.SUCCEEDED, null, null, 0));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -939,7 +940,7 @@ public class TestContainerReuse {
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
taskScheduler.shutdown();
- taskSchedulerEventHandler.close();
+ taskSchedulerManager.close();
}
@Test(timeout = 30000l)
@@ -962,7 +963,7 @@ public class TestContainerReuse {
doReturn(new Configuration(false)).when(appContext).getAMConf();
ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1);
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
- mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+ mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -971,12 +972,13 @@ public class TestContainerReuse {
doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal = new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
- TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
- taskSchedulerEventHandler.init(tezConf);
- taskSchedulerEventHandler.start();
+ TaskSchedulerManager
+ taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient, new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
+ TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
- TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager)
.getSpyTaskScheduler();
TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -1013,10 +1015,10 @@ public class TestContainerReuse {
AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, dag1LRs);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent11);
+ taskSchedulerManager.handleEvent(lrEvent11);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent12);
+ taskSchedulerManager.handleEvent(lrEvent12);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, "host1", resource1, priority1);
@@ -1026,17 +1028,17 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta111, true, null, null);
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1045,7 +1047,7 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
@@ -1074,14 +1076,14 @@ public class TestContainerReuse {
TaskAttempt ta212 = mock(TaskAttempt.class);
AMSchedulerEventTALaunchRequest lrEvent22 = createLaunchRequestEvent(taID212, ta212, resource1, host1, racks, priority1, dag2LRs);
- taskSchedulerEventHandler.handleEvent(lrEvent21);
- taskSchedulerEventHandler.handleEvent(lrEvent22);
+ taskSchedulerManager.handleEvent(lrEvent21);
+ taskSchedulerManager.handleEvent(lrEvent22);
drainableAppCallback.drain();
// TODO This is terrible, need a better way to ensure the scheduling loop has run
LOG.info("Sleeping to ensure that the scheduling loop runs");
Thread.sleep(6000l);
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class),
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class),
eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
@@ -1089,12 +1091,12 @@ public class TestContainerReuse {
assertEquals(2, assignEvent.getRemoteTaskLocalResources().size());
eventHandler.reset();
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta211, container1.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta211, true, null, null);
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta212), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1102,7 +1104,7 @@ public class TestContainerReuse {
eventHandler.reset();
taskScheduler.shutdown();
- taskSchedulerEventHandler.close();
+ taskSchedulerManager.close();
}
@Test(timeout = 30000l)
@@ -1125,7 +1127,7 @@ public class TestContainerReuse {
doReturn(new Configuration(false)).when(appContext).getAMConf();
ChangingDAGIDAnswer dagIDAnswer = new ChangingDAGIDAnswer(dagID1);
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
- mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+ mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -1134,14 +1136,14 @@ public class TestContainerReuse {
doAnswer(dagIDAnswer).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
- new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+ TaskSchedulerManager taskSchedulerManagerReal =
+ new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
new ContainerContextMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
- TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
- taskSchedulerEventHandler.init(tezConf);
- taskSchedulerEventHandler.start();
+ TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
- TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext) ((TaskSchedulerManagerForTest) taskSchedulerManager)
.getSpyTaskScheduler();
TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -1185,10 +1187,10 @@ public class TestContainerReuse {
AMSchedulerEventTALaunchRequest lrEvent12 = createLaunchRequestEvent(taID112, ta112, resource1, host1, racks, priority1, v12LR);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent11);
+ taskSchedulerManager.handleEvent(lrEvent11);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainNotifier.set(false);
- taskSchedulerEventHandler.handleEvent(lrEvent12);
+ taskSchedulerManager.handleEvent(lrEvent12);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
Container container1 = createContainer(1, "host1", resource1, priority1);
@@ -1199,17 +1201,17 @@ public class TestContainerReuse {
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta111), any(Object.class), eq(container1));
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
assertEquals(1, assignEvent.getRemoteTaskLocalResources().size());
// Task assigned to container completed successfully. Container should be re-used.
- taskSchedulerEventHandler.handleEvent(
+ taskSchedulerManager.handleEvent(
new AMSchedulerEventTAEnded(ta111, container1.getId(), TaskAttemptState.SUCCEEDED, null,
null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta111, true, null, null);
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta112), any(Object.class), eq(container1));
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
assignEvent = (AMContainerEventAssignTA) eventHandler.verifyInvocation(AMContainerEventAssignTA.class);
@@ -1218,7 +1220,7 @@ public class TestContainerReuse {
// Task assigned to container completed successfully.
// Verify reuse across hosts.
- taskSchedulerEventHandler.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta112, container1.getId(), TaskAttemptState.SUCCEEDED, null, null, 0));
drainableAppCallback.drain();
verifyDeAllocateTask(taskScheduler, ta112, true, null, null);
verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
@@ -1243,7 +1245,7 @@ public class TestContainerReuse {
AMSchedulerEventTALaunchRequest lrEvent21 = createLaunchRequestEvent(taID211, ta211, resource1,
host1, racks, priority1, v21LR);
- taskSchedulerEventHandler.handleEvent(lrEvent21);
+ taskSchedulerManager.handleEvent(lrEvent21);
drainableAppCallback.drain();
// TODO This is terrible, need a better way to ensure the scheduling loop has run
@@ -1254,11 +1256,11 @@ public class TestContainerReuse {
Thread.sleep(6000l);
verify(rmClient, times(1)).releaseAssignedContainer(eq(container1.getId()));
- verify(taskSchedulerEventHandler).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
eventHandler.reset();
taskScheduler.shutdown();
- taskSchedulerEventHandler.close();
+ taskSchedulerManager.close();
}
@Test(timeout = 10000l)
@@ -1281,7 +1283,7 @@ public class TestContainerReuse {
AppContext appContext = mock(AppContext.class);
doReturn(new Configuration(false)).when(appContext).getAMConf();
AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
- mock(TaskAttemptListener.class), new ContainerContextMatcher(), appContext);
+ mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
doReturn(amContainerMap).when(appContext).getAllContainers();
doReturn(amNodeTracker).when(appContext).getNodeTracker();
@@ -1290,15 +1292,15 @@ public class TestContainerReuse {
doReturn(dagID).when(appContext).getCurrentDAGID();
doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
- TaskSchedulerEventHandler taskSchedulerEventHandlerReal =
- new TaskSchedulerEventHandlerForTest(appContext, eventHandler, rmClient,
+ TaskSchedulerManager taskSchedulerManagerReal =
+ new TaskSchedulerManagerForTest(appContext, eventHandler, rmClient,
new AlwaysMatchesContainerMatcher(), TezUtils.createUserPayloadFromConf(tezConf));
- TaskSchedulerEventHandler taskSchedulerEventHandler = spy(taskSchedulerEventHandlerReal);
- taskSchedulerEventHandler.init(tezConf);
- taskSchedulerEventHandler.start();
+ TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext)
- ((TaskSchedulerEventHandlerForTest) taskSchedulerEventHandler)
+ ((TaskSchedulerManagerForTest) taskSchedulerManager)
.getSpyTaskScheduler();
TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
AtomicBoolean drainNotifier = new AtomicBoolean(false);
@@ -1318,7 +1320,7 @@ public class TestContainerReuse {
TaskAttempt ta11 = mock(TaskAttempt.class);
AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(taID11, ta11, resource1,
host1, racks, priority1);
- taskSchedulerEventHandler.handleEvent(lrEvent1);
+ taskSchedulerManager.handleEvent(lrEvent1);
Container container1 = createContainer(1, "host1", resource1, priority1);
@@ -1326,10 +1328,10 @@ public class TestContainerReuse {
drainNotifier.set(false);
taskScheduler.onContainersAllocated(Collections.singletonList(container1));
drainableAppCallback.drain();
- verify(taskSchedulerEventHandler, times(0)).taskAllocated(eq(0), eq(ta11),
+ verify(taskSchedulerManager, times(0)).taskAllocated(eq(0), eq(ta11),
any(Object.class), eq(container1));
taskScheduler.shutdown();
- taskSchedulerEventHandler.close();
+ taskSchedulerManager.close();
}
private Container createContainer(int id, String host, Resource resource, Priority priority) {
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
deleted file mode 100644
index c85be6c..0000000
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ /dev/null
@@ -1,707 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.app.rm;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.event.Event;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tez.common.ContainerSignatureMatcher;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.NamedEntityDescriptor;
-import org.apache.tez.dag.api.TaskLocationHint;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.client.DAGClientServer;
-import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.app.ContainerContext;
-import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
-import org.apache.tez.dag.app.dag.TaskAttempt;
-import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
-import org.apache.tez.dag.app.dag.impl.TaskImpl;
-import org.apache.tez.dag.app.dag.impl.VertexImpl;
-import org.apache.tez.dag.app.rm.container.AMContainer;
-import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
-import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
-import org.apache.tez.dag.app.rm.container.AMContainerEventType;
-import org.apache.tez.dag.app.rm.container.AMContainerMap;
-import org.apache.tez.dag.app.rm.container.AMContainerState;
-import org.apache.tez.dag.app.web.WebUIService;
-import org.apache.tez.dag.records.TaskAttemptTerminationCause;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.runtime.api.impl.TaskSpec;
-import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
-import org.apache.tez.serviceplugins.api.TaskScheduler;
-import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-@SuppressWarnings("rawtypes")
-public class TestTaskSchedulerEventHandler {
-
- class TestEventHandler implements EventHandler{
- List<Event> events = Lists.newLinkedList();
- @Override
- public void handle(Event event) {
- events.add(event);
- }
- }
-
- class MockTaskSchedulerEventHandler extends TaskSchedulerEventHandler {
-
- final AtomicBoolean notify = new AtomicBoolean(false);
-
- public MockTaskSchedulerEventHandler(AppContext appContext,
- DAGClientServer clientService, EventHandler eventHandler,
- ContainerSignatureMatcher containerSignatureMatcher, WebUIService webUI) {
- super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI,
- Lists.newArrayList(new NamedEntityDescriptor("FakeDescriptor", null)), false);
- }
-
- @Override
- protected void instantiateSchedulers(String host, int port, String trackingUrl,
- AppContext appContext) {
- taskSchedulers[0] = mockTaskScheduler;
- taskSchedulerServiceWrappers[0] = new ServicePluginLifecycleAbstractService<>(taskSchedulers[0]);
- }
-
- @Override
- protected void notifyForTest() {
- synchronized (notify) {
- notify.set(true);
- notify.notifyAll();
- }
- }
-
- }
-
- AppContext mockAppContext;
- DAGClientServer mockClientService;
- TestEventHandler mockEventHandler;
- ContainerSignatureMatcher mockSigMatcher;
- MockTaskSchedulerEventHandler schedulerHandler;
- TaskScheduler mockTaskScheduler;
- AMContainerMap mockAMContainerMap;
- WebUIService mockWebUIService;
-
- @Before
- public void setup() {
- mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
- doReturn(new Configuration(false)).when(mockAppContext).getAMConf();
- mockClientService = mock(DAGClientServer.class);
- mockEventHandler = new TestEventHandler();
- mockSigMatcher = mock(ContainerSignatureMatcher.class);
- mockTaskScheduler = mock(TaskScheduler.class);
- mockAMContainerMap = mock(AMContainerMap.class);
- mockWebUIService = mock(WebUIService.class);
- when(mockAppContext.getAllContainers()).thenReturn(mockAMContainerMap);
- when(mockClientService.getBindAddress()).thenReturn(new InetSocketAddress(10000));
- schedulerHandler = new MockTaskSchedulerEventHandler(
- mockAppContext, mockClientService, mockEventHandler, mockSigMatcher, mockWebUIService);
- }
-
- @Test(timeout = 5000)
- public void testSimpleAllocate() throws Exception {
- Configuration conf = new Configuration(false);
- schedulerHandler.init(conf);
- schedulerHandler.start();
-
- TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class);
- TezTaskAttemptID mockAttemptId = mock(TezTaskAttemptID.class);
- when(mockAttemptId.getId()).thenReturn(0);
- when(mockTaskAttempt.getID()).thenReturn(mockAttemptId);
- Resource resource = Resource.newInstance(1024, 1);
- ContainerContext containerContext =
- new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(),
- new HashMap<String, String>(), "");
- int priority = 10;
- TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(new HashSet<String>(), null);
-
- ContainerId mockCId = mock(ContainerId.class);
- Container container = mock(Container.class);
- when(container.getId()).thenReturn(mockCId);
-
- AMContainer mockAMContainer = mock(AMContainer.class);
- when(mockAMContainer.getContainerId()).thenReturn(mockCId);
- when(mockAMContainer.getState()).thenReturn(AMContainerState.IDLE);
-
- when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
-
- AMSchedulerEventTALaunchRequest lr =
- new AMSchedulerEventTALaunchRequest(mockAttemptId, resource, null, mockTaskAttempt, locHint,
- priority, containerContext, 0, 0, 0);
- schedulerHandler.taskAllocated(0, mockTaskAttempt, lr, container);
- assertEquals(2, mockEventHandler.events.size());
- assertTrue(mockEventHandler.events.get(1) instanceof AMContainerEventAssignTA);
- AMContainerEventAssignTA assignEvent =
- (AMContainerEventAssignTA) mockEventHandler.events.get(1);
- assertEquals(priority, assignEvent.getPriority());
- assertEquals(mockAttemptId, assignEvent.getTaskAttemptId());
- }
-
- @Test (timeout = 5000)
- public void testTaskBasedAffinity() throws Exception {
- Configuration conf = new Configuration(false);
- schedulerHandler.init(conf);
- schedulerHandler.start();
-
- TaskAttemptImpl mockTaskAttempt = mock(TaskAttemptImpl.class);
- TezTaskAttemptID taId = mock(TezTaskAttemptID.class);
- String affVertexName = "srcVertex";
- int affTaskIndex = 1;
- TaskLocationHint locHint = TaskLocationHint.createTaskLocationHint(affVertexName, affTaskIndex);
- VertexImpl affVertex = mock(VertexImpl.class);
- TaskImpl affTask = mock(TaskImpl.class);
- TaskAttemptImpl affAttempt = mock(TaskAttemptImpl.class);
- ContainerId affCId = mock(ContainerId.class);
- when(affVertex.getTotalTasks()).thenReturn(2);
- when(affVertex.getTask(affTaskIndex)).thenReturn(affTask);
- when(affTask.getSuccessfulAttempt()).thenReturn(affAttempt);
- when(affAttempt.getAssignedContainerID()).thenReturn(affCId);
- when(mockAppContext.getCurrentDAG().getVertex(affVertexName)).thenReturn(affVertex);
- Resource resource = Resource.newInstance(100, 1);
- AMSchedulerEventTALaunchRequest event = new AMSchedulerEventTALaunchRequest
- (taId, resource, null, mockTaskAttempt, locHint, 3, null, 0, 0, 0);
- schedulerHandler.notify.set(false);
- schedulerHandler.handle(event);
- synchronized (schedulerHandler.notify) {
- while (!schedulerHandler.notify.get()) {
- schedulerHandler.notify.wait();
- }
- }
-
- // verify mockTaskAttempt affinitized to expected affCId
- verify(mockTaskScheduler, times(1)).allocateTask(mockTaskAttempt, resource, affCId,
- Priority.newInstance(3), null, event);
-
- schedulerHandler.stop();
- schedulerHandler.close();
- }
-
- @Test (timeout = 5000)
- public void testContainerPreempted() throws IOException {
- Configuration conf = new Configuration(false);
- schedulerHandler.init(conf);
- schedulerHandler.start();
-
- String diagnostics = "Container preempted by RM.";
- TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class);
- ContainerStatus mockStatus = mock(ContainerStatus.class);
- ContainerId mockCId = mock(ContainerId.class);
- AMContainer mockAMContainer = mock(AMContainer.class);
- when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
- when(mockAMContainer.getContainerId()).thenReturn(mockCId);
- when(mockStatus.getContainerId()).thenReturn(mockCId);
- when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
- when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.PREEMPTED);
- schedulerHandler.containerCompleted(0, mockTask, mockStatus);
- assertEquals(1, mockEventHandler.events.size());
- Event event = mockEventHandler.events.get(0);
- assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
- AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
- assertEquals(mockCId, completedEvent.getContainerId());
- assertEquals("Container preempted externally. Container preempted by RM.",
- completedEvent.getDiagnostics());
- assertTrue(completedEvent.isPreempted());
- assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
- completedEvent.getTerminationCause());
- Assert.assertFalse(completedEvent.isDiskFailed());
-
- schedulerHandler.stop();
- schedulerHandler.close();
- }
-
- @Test (timeout = 5000)
- public void testContainerInternalPreempted() throws IOException {
- Configuration conf = new Configuration(false);
- schedulerHandler.init(conf);
- schedulerHandler.start();
-
- AMContainer mockAmContainer = mock(AMContainer.class);
- when(mockAmContainer.getTaskSchedulerIdentifier()).thenReturn(0);
- when(mockAmContainer.getContainerLauncherIdentifier()).thenReturn(0);
- when(mockAmContainer.getTaskCommunicatorIdentifier()).thenReturn(0);
- ContainerId mockCId = mock(ContainerId.class);
- verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId) any());
- when(mockAMContainerMap.get(mockCId)).thenReturn(mockAmContainer);
- schedulerHandler.preemptContainer(0, mockCId);
- verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
- assertEquals(1, mockEventHandler.events.size());
- Event event = mockEventHandler.events.get(0);
- assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
- AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
- assertEquals(mockCId, completedEvent.getContainerId());
- assertEquals("Container preempted internally", completedEvent.getDiagnostics());
- assertTrue(completedEvent.isPreempted());
- Assert.assertFalse(completedEvent.isDiskFailed());
- assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
- completedEvent.getTerminationCause());
-
- schedulerHandler.stop();
- schedulerHandler.close();
- }
-
- @Test (timeout = 5000)
- public void testContainerDiskFailed() throws IOException {
- Configuration conf = new Configuration(false);
- schedulerHandler.init(conf);
- schedulerHandler.start();
-
- String diagnostics = "NM disk failed.";
- TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class);
- ContainerStatus mockStatus = mock(ContainerStatus.class);
- ContainerId mockCId = mock(ContainerId.class);
- AMContainer mockAMContainer = mock(AMContainer.class);
- when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
- when(mockAMContainer.getContainerId()).thenReturn(mockCId);
- when(mockStatus.getContainerId()).thenReturn(mockCId);
- when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
- when(mockStatus.getExitStatus()).thenReturn(ContainerExitStatus.DISKS_FAILED);
- schedulerHandler.containerCompleted(0, mockTask, mockStatus);
- assertEquals(1, mockEventHandler.events.size());
- Event event = mockEventHandler.events.get(0);
- assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
- AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
- assertEquals(mockCId, completedEvent.getContainerId());
- assertEquals("Container disk failed. NM disk failed.",
- completedEvent.getDiagnostics());
- Assert.assertFalse(completedEvent.isPreempted());
- assertTrue(completedEvent.isDiskFailed());
- assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
- completedEvent.getTerminationCause());
-
- schedulerHandler.stop();
- schedulerHandler.close();
- }
-
- @Test (timeout = 5000)
- public void testContainerExceededPMem() throws IOException {
- Configuration conf = new Configuration(false);
- schedulerHandler.init(conf);
- schedulerHandler.start();
-
- String diagnostics = "Exceeded Physical Memory";
- TaskAttemptImpl mockTask = mock(TaskAttemptImpl.class);
- ContainerStatus mockStatus = mock(ContainerStatus.class);
- ContainerId mockCId = mock(ContainerId.class);
- AMContainer mockAMContainer = mock(AMContainer.class);
- when(mockAMContainerMap.get(mockCId)).thenReturn(mockAMContainer);
- when(mockAMContainer.getContainerId()).thenReturn(mockCId);
- when(mockStatus.getContainerId()).thenReturn(mockCId);
- when(mockStatus.getDiagnostics()).thenReturn(diagnostics);
- // use -104 rather than ContainerExitStatus.KILLED_EXCEEDED_PMEM because
- // ContainerExitStatus.KILLED_EXCEEDED_PMEM is only available after hadoop-2.5
- when(mockStatus.getExitStatus()).thenReturn(-104);
- schedulerHandler.containerCompleted(0, mockTask, mockStatus);
- assertEquals(1, mockEventHandler.events.size());
- Event event = mockEventHandler.events.get(0);
- assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
- AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
- assertEquals(mockCId, completedEvent.getContainerId());
- assertEquals("Container failed, exitCode=-104. Exceeded Physical Memory",
- completedEvent.getDiagnostics());
- Assert.assertFalse(completedEvent.isPreempted());
- Assert.assertFalse(completedEvent.isDiskFailed());
- assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED,
- completedEvent.getTerminationCause());
-
- schedulerHandler.stop();
- schedulerHandler.close();
- }
-
- @Test (timeout = 5000)
- public void testHistoryUrlConf() throws Exception {
- Configuration conf = schedulerHandler.appContext.getAMConf();
-
- // ensure history url is empty when timeline server is not the logging class
- conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9999");
- assertTrue("".equals(schedulerHandler.getHistoryUrl()));
-
- // ensure expansion of url happens
- conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
- "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService");
- final ApplicationId mockApplicationId = mock(ApplicationId.class);
- doReturn("TEST_APP_ID").when(mockApplicationId).toString();
- doReturn(mockApplicationId).when(mockAppContext).getApplicationID();
- assertTrue("http://ui-host:9999/#/tez-app/TEST_APP_ID"
- .equals(schedulerHandler.getHistoryUrl()));
-
- // ensure the trailing / in history url is handled
- conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://ui-host:9998/");
- assertTrue("http://ui-host:9998/#/tez-app/TEST_APP_ID"
- .equals(schedulerHandler.getHistoryUrl()));
-
- // ensure missing scheme in history url is handled
- conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "ui-host:9998/");
- Assert.assertTrue("http://ui-host:9998/#/tez-app/TEST_APP_ID"
- .equals(schedulerHandler.getHistoryUrl()));
-
- // handle bad template ex without begining /
- conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
- "__HISTORY_URL_BASE__#/somepath");
- assertTrue("http://ui-host:9998/#/somepath"
- .equals(schedulerHandler.getHistoryUrl()));
-
- conf.set(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
- "__HISTORY_URL_BASE__?viewPath=tez-app/__APPLICATION_ID__");
- conf.set(TezConfiguration.TEZ_HISTORY_URL_BASE, "http://localhost/ui/tez");
- assertTrue("http://localhost/ui/tez?viewPath=tez-app/TEST_APP_ID"
- .equals(schedulerHandler.getHistoryUrl()));
-
- }
-
- @Test(timeout = 5000)
- public void testNoSchedulerSpecified() throws IOException {
- try {
- new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
- mockSigMatcher, mockWebUIService, null, false);
- fail("Expecting an IllegalStateException with no schedulers specified");
- } catch (IllegalArgumentException e) {
- }
- }
-
- // Verified via statics
- @Test(timeout = 5000)
- public void testCustomTaskSchedulerSetup() throws IOException {
- Configuration conf = new Configuration(false);
- conf.set("testkey", "testval");
- UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf);
-
- String customSchedulerName = "fakeScheduler";
- List<NamedEntityDescriptor> taskSchedulers = new LinkedList<>();
- ByteBuffer bb = ByteBuffer.allocate(4);
- bb.putInt(0, 3);
- UserPayload userPayload = UserPayload.create(bb);
- taskSchedulers.add(
- new NamedEntityDescriptor(customSchedulerName, FakeTaskScheduler.class.getName())
- .setUserPayload(userPayload));
- taskSchedulers.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
- .setUserPayload(defaultPayload));
-
- TSEHForMultipleSchedulersTest tseh =
- new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
- mockSigMatcher, mockWebUIService, taskSchedulers, false);
-
- tseh.init(conf);
- tseh.start();
-
- // Verify that the YARN task scheduler is installed by default
- assertTrue(tseh.getYarnSchedulerCreated());
- assertFalse(tseh.getUberSchedulerCreated());
- assertEquals(2, tseh.getNumCreateInvocations());
-
- // Verify the order of the schedulers
- assertEquals(customSchedulerName, tseh.getTaskSchedulerName(0));
- assertEquals(TezConstants.getTezYarnServicePluginName(), tseh.getTaskSchedulerName(1));
-
- // Verify the payload setup for the custom task scheduler
- assertNotNull(tseh.getTaskSchedulerContext(0));
- assertEquals(bb, tseh.getTaskSchedulerContext(0).getInitialUserPayload().getPayload());
-
- // Verify the payload on the yarn scheduler
- assertNotNull(tseh.getTaskSchedulerContext(1));
- Configuration parsed = TezUtils.createConfFromUserPayload(tseh.getTaskSchedulerContext(1).getInitialUserPayload());
- assertEquals("testval", parsed.get("testkey"));
- }
-
- @Test(timeout = 5000)
- public void testTaskSchedulerRouting() throws Exception {
- Configuration conf = new Configuration(false);
- UserPayload defaultPayload = TezUtils.createUserPayloadFromConf(conf);
-
- String customSchedulerName = "fakeScheduler";
- List<NamedEntityDescriptor> taskSchedulers = new LinkedList<>();
- ByteBuffer bb = ByteBuffer.allocate(4);
- bb.putInt(0, 3);
- UserPayload userPayload = UserPayload.create(bb);
- taskSchedulers.add(
- new NamedEntityDescriptor(customSchedulerName, FakeTaskScheduler.class.getName())
- .setUserPayload(userPayload));
- taskSchedulers.add(new NamedEntityDescriptor(TezConstants.getTezYarnServicePluginName(), null)
- .setUserPayload(defaultPayload));
-
- TSEHForMultipleSchedulersTest tseh =
- new TSEHForMultipleSchedulersTest(mockAppContext, mockClientService, mockEventHandler,
- mockSigMatcher, mockWebUIService, taskSchedulers, false);
-
- tseh.init(conf);
- tseh.start();
-
- // Verify that the YARN task scheduler is installed by default
- assertTrue(tseh.getYarnSchedulerCreated());
- assertFalse(tseh.getUberSchedulerCreated());
- assertEquals(2, tseh.getNumCreateInvocations());
-
- // Verify the order of the schedulers
- assertEquals(customSchedulerName, tseh.getTaskSchedulerName(0));
- assertEquals(TezConstants.getTezYarnServicePluginName(), tseh.getTaskSchedulerName(1));
-
- verify(tseh.getTestTaskScheduler(0)).initialize();
- verify(tseh.getTestTaskScheduler(0)).start();
-
- ApplicationId appId = ApplicationId.newInstance(1000, 1);
- TezDAGID dagId = TezDAGID.getInstance(appId, 1);
- TezVertexID vertexID = TezVertexID.getInstance(dagId, 1);
- TezTaskID taskId1 = TezTaskID.getInstance(vertexID, 1);
- TezTaskAttemptID attemptId11 = TezTaskAttemptID.getInstance(taskId1, 1);
- TezTaskID taskId2 = TezTaskID.getInstance(vertexID, 2);
- TezTaskAttemptID attemptId21 = TezTaskAttemptID.getInstance(taskId2, 1);
-
- Resource resource = Resource.newInstance(1024, 1);
-
- TaskAttempt mockTaskAttempt1 = mock(TaskAttempt.class);
- TaskAttempt mockTaskAttempt2 = mock(TaskAttempt.class);
-
- AMSchedulerEventTALaunchRequest launchRequest1 =
- new AMSchedulerEventTALaunchRequest(attemptId11, resource, mock(TaskSpec.class),
- mockTaskAttempt1, mock(TaskLocationHint.class), 1, mock(ContainerContext.class), 0, 0,
- 0);
-
- tseh.handle(launchRequest1);
-
- verify(tseh.getTestTaskScheduler(0)).allocateTask(eq(mockTaskAttempt1), eq(resource),
- any(String[].class), any(String[].class), any(Priority.class), any(Object.class),
- eq(launchRequest1));
-
- AMSchedulerEventTALaunchRequest launchRequest2 =
- new AMSchedulerEventTALaunchRequest(attemptId21, resource, mock(TaskSpec.class),
- mockTaskAttempt2, mock(TaskLocationHint.class), 1, mock(ContainerContext.class), 1, 0,
- 0);
- tseh.handle(launchRequest2);
- verify(tseh.getTestTaskScheduler(1)).allocateTask(eq(mockTaskAttempt2), eq(resource),
- any(String[].class), any(String[].class), any(Priority.class), any(Object.class),
- eq(launchRequest2));
- }
-
- private static class TSEHForMultipleSchedulersTest extends TaskSchedulerEventHandler {
-
- private final TaskScheduler yarnTaskScheduler;
- private final TaskScheduler uberTaskScheduler;
- private final AtomicBoolean uberSchedulerCreated = new AtomicBoolean(false);
- private final AtomicBoolean yarnSchedulerCreated = new AtomicBoolean(false);
- private final AtomicInteger numCreateInvocations = new AtomicInteger(0);
- private final Set<Integer> seenSchedulers = new HashSet<>();
- private final List<TaskSchedulerContext> taskSchedulerContexts = new LinkedList<>();
- private final List<String> taskSchedulerNames = new LinkedList<>();
- private final List<TaskScheduler> testTaskSchedulers = new LinkedList<>();
-
- public TSEHForMultipleSchedulersTest(AppContext appContext,
- DAGClientServer clientService,
- EventHandler eventHandler,
- ContainerSignatureMatcher containerSignatureMatcher,
- WebUIService webUI,
- List<NamedEntityDescriptor> schedulerDescriptors,
- boolean isPureLocalMode) {
- super(appContext, clientService, eventHandler, containerSignatureMatcher, webUI,
- schedulerDescriptors, isPureLocalMode);
- yarnTaskScheduler = mock(TaskScheduler.class);
- uberTaskScheduler = mock(TaskScheduler.class);
- }
-
- @Override
- TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
- AppContext appContext,
- NamedEntityDescriptor taskSchedulerDescriptor,
- long customAppIdIdentifier,
- int schedulerId) {
-
- numCreateInvocations.incrementAndGet();
- boolean added = seenSchedulers.add(schedulerId);
- assertTrue("Cannot add multiple schedulers with the same schedulerId", added);
- taskSchedulerNames.add(taskSchedulerDescriptor.getEntityName());
- return super.createTaskScheduler(host, port, trackingUrl, appContext, taskSchedulerDescriptor,
- customAppIdIdentifier, schedulerId);
- }
-
- @Override
- TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
- // Avoid wrapping in threads
- return rawContext;
- }
-
- @Override
- TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) {
- taskSchedulerContexts.add(taskSchedulerContext);
- testTaskSchedulers.add(yarnTaskScheduler);
- yarnSchedulerCreated.set(true);
- return yarnTaskScheduler;
- }
-
- @Override
- TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext, int schedulerId) {
- taskSchedulerContexts.add(taskSchedulerContext);
- uberSchedulerCreated.set(true);
- testTaskSchedulers.add(yarnTaskScheduler);
- return uberTaskScheduler;
- }
-
- @Override
- TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
- NamedEntityDescriptor taskSchedulerDescriptor, int schedulerId) {
- taskSchedulerContexts.add(taskSchedulerContext);
- TaskScheduler taskScheduler = spy(super.createCustomTaskScheduler(taskSchedulerContext, taskSchedulerDescriptor, schedulerId));
- testTaskSchedulers.add(taskScheduler);
- return taskScheduler;
- }
-
- @Override
- // Inline handling of events.
- public void handle(AMSchedulerEvent event) {
- handleEvent(event);
- }
-
- public boolean getUberSchedulerCreated() {
- return uberSchedulerCreated.get();
- }
-
- public boolean getYarnSchedulerCreated() {
- return yarnSchedulerCreated.get();
- }
-
- public int getNumCreateInvocations() {
- return numCreateInvocations.get();
- }
-
- public TaskSchedulerContext getTaskSchedulerContext(int schedulerId) {
- return taskSchedulerContexts.get(schedulerId);
- }
-
- public String getTaskSchedulerName(int schedulerId) {
- return taskSchedulerNames.get(schedulerId);
- }
-
- public TaskScheduler getTestTaskScheduler(int schedulerId) {
- return testTaskSchedulers.get(schedulerId);
- }
- }
-
- public static class FakeTaskScheduler extends TaskScheduler {
-
- public FakeTaskScheduler(
- TaskSchedulerContext taskSchedulerContext) {
- super(taskSchedulerContext);
- }
-
- @Override
- public Resource getAvailableResources() {
- return null;
- }
-
- @Override
- public int getClusterNodeCount() {
- return 0;
- }
-
- @Override
- public void dagComplete() {
-
- }
-
- @Override
- public Resource getTotalResources() {
- return null;
- }
-
- @Override
- public void blacklistNode(NodeId nodeId) {
-
- }
-
- @Override
- public void unblacklistNode(NodeId nodeId) {
-
- }
-
- @Override
- public void allocateTask(Object task, Resource capability, String[] hosts, String[] racks,
- Priority priority, Object containerSignature, Object clientCookie) {
-
- }
-
- @Override
- public void allocateTask(Object task, Resource capability, ContainerId containerId,
- Priority priority, Object containerSignature, Object clientCookie) {
-
- }
-
- @Override
- public boolean deallocateTask(Object task, boolean taskSucceeded,
- TaskAttemptEndReason endReason,
- String diagnostics) {
- return false;
- }
-
- @Override
- public Object deallocateContainer(ContainerId containerId) {
- return null;
- }
-
- @Override
- public void setShouldUnregister() {
-
- }
-
- @Override
- public boolean hasUnregistered() {
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/8b278ea8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index c13ca5a..b1bc491 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -125,19 +125,19 @@ class TestTaskSchedulerHelpers {
}
// Overrides start / stop. Will be controlled without the extra event handling thread.
- static class TaskSchedulerEventHandlerForTest extends
- TaskSchedulerEventHandler {
+ static class TaskSchedulerManagerForTest extends
+ TaskSchedulerManager {
private TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync;
private ContainerSignatureMatcher containerSignatureMatcher;
private UserPayload defaultPayload;
@SuppressWarnings("rawtypes")
- public TaskSchedulerEventHandlerForTest(AppContext appContext,
- EventHandler eventHandler,
- TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
- ContainerSignatureMatcher containerSignatureMatcher,
- UserPayload defaultPayload) {
+ public TaskSchedulerManagerForTest(AppContext appContext,
+ EventHandler eventHandler,
+ TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync,
+ ContainerSignatureMatcher containerSignatureMatcher,
+ UserPayload defaultPayload) {
super(appContext, null, eventHandler, containerSignatureMatcher, null,
Lists.newArrayList(new NamedEntityDescriptor("FakeScheduler", null)),
false);