You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/11/02 17:15:10 UTC
tez git commit: TEZ-3247. Add more unit test coverage for container
reuse. (Harish Jaiprakash via hitesh)
Repository: tez
Updated Branches:
refs/heads/master a328d469d -> 2e121ec7e
TEZ-3247. Add more unit test coverage for container reuse. (Harish Jaiprakash via hitesh)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2e121ec7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2e121ec7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2e121ec7
Branch: refs/heads/master
Commit: 2e121ec7eac5c3723b419856e8d1f3289f4feae7
Parents: a328d46
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Nov 2 10:10:13 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Nov 2 10:10:13 2016 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/rm/TestContainerReuse.java | 321 +++++++++++++++++++
2 files changed, 322 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/2e121ec7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4beb80..733a66e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3247. Add more unit test coverage for container reuse.
TEZ-3215. Support for MultipleOutputs.
TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess.
TEZ-3487. Improvements in travis yml file to get builds to work.
http://git-wip-us.apache.org/repos/asf/tez/blob/2e121ec7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index f21de3e..7e9e9ab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -96,6 +96,7 @@ import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -1275,6 +1276,35 @@ public class TestContainerReuse {
verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
eventHandler.reset();
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta211, container2.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta211, true, null, null);
+ verify(rmClient, times(0)).releaseAssignedContainer(eq(container2.getId()));
+ eventHandler.reset();
+
+
+ // Setup DAG3 with a subset of resources from DAG2 (container should be reused).
+ TezDAGID dagID3 = TezDAGID.getInstance("0", 3, 0);
+ dagIDAnswer.setDAGID(dagID3);
+
+ Map<String, LocalResource> v31LR = Collections.singletonMap(rsrc1, lr1);
+
+ // dag3, vertex1, task1, ta1
+ TezTaskAttemptID taID311 = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(TezVertexID.getInstance(dagID3, 1), 1), 1);
+ TaskAttempt ta311 = mock(TaskAttempt.class);
+ doReturn(taID311).when(ta311).getID();
+ doReturn("Mock for TA " + taID311).when(ta311).toString();
+ AMSchedulerEventTALaunchRequest lrEvent31 = createLaunchRequestEvent(taID311, ta311, resource1,
+ host1, racks, priority1, v31LR);
+
+ taskSchedulerManager.handleEvent(lrEvent31);
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta311), any(Object.class), eq(container2));
+ eventHandler.reset();
+
taskScheduler.shutdown();
taskSchedulerManager.close();
}
@@ -1348,6 +1378,289 @@ public class TestContainerReuse {
taskSchedulerManager.close();
}
+ @Test(timeout=5000)
+ public void testDifferentResourceContainerReuse() throws Exception {
+ Configuration tezConf = new Configuration(new YarnConfiguration());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+ tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+ tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
+ tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
+
+ CapturingEventHandler eventHandler = new CapturingEventHandler();
+ TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+
+ AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
+ TezAMRMClientAsync<CookieContainerRequest> rmClient =
+ spy(new AMRMClientAsyncForTest(rmClientCore, 100));
+
+ AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
+ AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
+ mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
+ AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+ doReturn(amContainerMap).when(appContext).getAllContainers();
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+ doReturn(dagID).when(appContext).getCurrentDAGID();
+ doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+ TaskSchedulerManager taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext,
+ eventHandler, rmClient, new AlwaysMatchesContainerMatcher(),
+ TezUtils.createUserPayloadFromConf(tezConf));
+ TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
+
+ TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext)
+ ((TaskSchedulerManagerForTest) taskSchedulerManager).getSpyTaskScheduler();
+ TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+ AtomicBoolean drainNotifier = new AtomicBoolean(false);
+ taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+ Resource resource1 = Resource.newInstance(1024, 1);
+ Resource resource2 = Resource.newInstance(2048, 1);
+ String[] host1 = {"host1"};
+ String[] host2 = {"host2"};
+
+ String []racks = {"/default-rack"};
+ Priority priority1 = Priority.newInstance(1);
+
+ TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
+
+ //Vertex 1, Task 1, Attempt 1, host1
+ TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1);
+ TaskAttempt ta11 = mock(TaskAttempt.class);
+ AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(
+ taID11, ta11, resource1, host1, racks, priority1);
+
+ //Vertex 1, Task 2, Attempt 1, host1
+ TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1);
+ TaskAttempt ta12 = mock(TaskAttempt.class);
+ AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent(
+ taID12, ta12, resource1, host1, racks, priority1);
+
+ //Vertex 1, Task 3, Attempt 1, host2
+ TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1);
+ TaskAttempt ta13 = mock(TaskAttempt.class);
+ AMSchedulerEventTALaunchRequest lrEvent3 = createLaunchRequestEvent(
+ taID13, ta13, resource2, host2, racks, priority1);
+
+ //Vertex 1, Task 4, Attempt 1, host2
+ TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1);
+ TaskAttempt ta14 = mock(TaskAttempt.class);
+ AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(
+ taID14, ta14, resource2, host2, racks, priority1);
+
+ taskSchedulerManager.handleEvent(lrEvent1);
+ taskSchedulerManager.handleEvent(lrEvent2);
+ taskSchedulerManager.handleEvent(lrEvent3);
+ taskSchedulerManager.handleEvent(lrEvent4);
+
+ Container container1 = createContainer(1, "host1", resource1, priority1);
+ Container container2 = createContainer(2, "host2", resource2, priority1);
+
+ // One container allocated, should start ta11
+ taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
+
+ // Second container allocated, should start ta13
+ taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2));
+
+ // ta11 finished, should start ta12
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
+ verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+ eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ // ta13 finished, should start ta14
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta13, true, null, null);
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2));
+ verify(rmClient, times(0)).releaseAssignedContainer(eq(container2.getId()));
+ eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ // ta12 finished no pending requests, should release container1
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta12, true, null, null);
+ verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
+ eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ // ta14 finished no pending requests, should release container2.
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta14, true, null, null);
+ verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
+ eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ taskScheduler.shutdown();
+ taskSchedulerManager.close();
+ }
+
+ @Test(timeout=5000)
+ public void testEnvironmentVarsContainerReuse() throws Exception {
+ Configuration tezConf = new Configuration(new YarnConfiguration());
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+ tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+ tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+ tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
+ tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
+
+ CapturingEventHandler eventHandler = new CapturingEventHandler();
+ TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+
+ AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
+ TezAMRMClientAsync<CookieContainerRequest> rmClient =
+ spy(new AMRMClientAsyncForTest(rmClientCore, 100));
+
+ AppContext appContext = mock(AppContext.class);
+ doReturn(new Configuration(false)).when(appContext).getAMConf();
+ AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
+ mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
+ AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+ doReturn(amContainerMap).when(appContext).getAllContainers();
+ doReturn(amNodeTracker).when(appContext).getNodeTracker();
+ doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+ doReturn(dagID).when(appContext).getCurrentDAGID();
+ doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+ TaskSchedulerManager taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext,
+ eventHandler, rmClient, new ContainerContextMatcher(),
+ TezUtils.createUserPayloadFromConf(tezConf));
+ TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+ taskSchedulerManager.init(tezConf);
+ taskSchedulerManager.start();
+
+ TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext)
+ ((TaskSchedulerManagerForTest) taskSchedulerManager).getSpyTaskScheduler();
+ TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+ AtomicBoolean drainNotifier = new AtomicBoolean(false);
+ taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+ Resource resource1 = Resource.newInstance(1024, 1);
+ String[] host1 = {"host1"};
+
+ String []racks = {"/default-rack"};
+ Priority priority1 = Priority.newInstance(1);
+
+ TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
+
+ // Create containers with same resources and then see how tasks are allocated
+ // when the environments vary
+ Map<String, String> env1 = ImmutableMap.of("env1", "val1", "env2", "val2");
+ Map<String, String> env2 = ImmutableMap.of("env3", "val3", "env4", "val4");
+ Map<String, String> env3 = ImmutableMap.of("env1", "val1");
+ Map<String, String> env4 = ImmutableMap.of("env1", "val1", "env4", "val4");
+
+ //Vertex 1, Task 1, Attempt 1, host1, r1=lr1
+ TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1);
+ TaskAttempt ta11 = mock(TaskAttempt.class);
+ AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(
+ taID11, ta11, resource1, host1, racks, priority1, "", env1);
+
+ //Vertex 1, Task 2, Attempt 1, host1, r1=lr2
+ TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1);
+ TaskAttempt ta12 = mock(TaskAttempt.class);
+ AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent(
+ taID12, ta12, resource1, host1, racks, priority1, "", env2);
+
+ //Vertex 1, Task 3, Attempt 1, host1, r1=lr1 & r2=lr2
+ TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1);
+ TaskAttempt ta13 = mock(TaskAttempt.class);
+ AMSchedulerEventTALaunchRequest lrEvent3 = createLaunchRequestEvent(
+ taID13, ta13, resource1, host1, racks, priority1, "", env3);
+
+ //Vertex 1, Task 4, Attempt 1, host1, r2=lr2
+ TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1);
+ TaskAttempt ta14 = mock(TaskAttempt.class);
+ AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(
+ taID14, ta14, resource1, host1, racks, priority1, "", env4);
+
+ taskSchedulerManager.handleEvent(lrEvent1);
+ taskSchedulerManager.handleEvent(lrEvent2);
+ taskSchedulerManager.handleEvent(lrEvent3);
+ taskSchedulerManager.handleEvent(lrEvent4);
+
+ Container container1 = createContainer(1, "host1", resource1, priority1);
+ Container container2 = createContainer(2, "host1", resource1, priority1);
+ Container container3 = createContainer(3, "host1", resource1, priority1);
+
+ // One container allocated, should start ta11 on container1
+ taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
+
+ // finish ta11, should start ta13
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1));
+ verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+ eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ // ta13 finished, cannot reuse container1, should release container1
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta13, true, null, null);
+ verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
+ eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ // Second container allocated, should start ta12.
+ taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container2));
+
+ // ta12 finished, cannot reuse container, should release container2
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta12, container2.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta12, true, null, null);
+ verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
+ eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ // Third container allocated, should start ta14.
+ taskScheduler.onContainersAllocated(Collections.singletonList(container3));
+ TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+ drainableAppCallback.drain();
+ verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container3));
+
+ // ta14 finished, should release container3
+ taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta14, container3.getId(),
+ TaskAttemptState.SUCCEEDED, null, null, 0));
+ drainableAppCallback.drain();
+ verifyDeAllocateTask(taskScheduler, ta14, true, null, null);
+ verify(rmClient).releaseAssignedContainer(eq(container3.getId()));
+ eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+ eventHandler.reset();
+
+ taskScheduler.shutdown();
+ taskSchedulerManager.close();
+ }
+
private Container createContainer(int id, String host, Resource resource, Priority priority) {
@SuppressWarnings("deprecation")
ContainerId containerID = ContainerId.newInstance(
@@ -1400,6 +1713,14 @@ public class TestContainerReuse {
new ContainerContext(localResources, new Credentials(), new HashMap<String, String>(), jvmOpts));
}
+ private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID,
+ TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority,
+ String jvmOpts, Map<String, String> environment) {
+ return createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority,
+ new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(), environment,
+ jvmOpts));
+ }
+
private static class ChangingDAGIDAnswer implements Answer<TezDAGID> {
private TezDAGID dagID;