You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2016/11/02 17:15:10 UTC

tez git commit: TEZ-3247. Add more unit test coverage for container reuse. (Harish Jaiprakash via hitesh)

Repository: tez
Updated Branches:
  refs/heads/master a328d469d -> 2e121ec7e


TEZ-3247. Add more unit test coverage for container reuse. (Harish Jaiprakash via hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/2e121ec7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/2e121ec7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/2e121ec7

Branch: refs/heads/master
Commit: 2e121ec7eac5c3723b419856e8d1f3289f4feae7
Parents: a328d46
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed Nov 2 10:10:13 2016 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed Nov 2 10:10:13 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/app/rm/TestContainerReuse.java      | 321 +++++++++++++++++++
 2 files changed, 322 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/2e121ec7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4beb80..733a66e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3247. Add more unit test coverage for container reuse.
   TEZ-3215. Support for MultipleOutputs.
   TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess.
   TEZ-3487. Improvements in travis yml file to get builds to work.

http://git-wip-us.apache.org/repos/asf/tez/blob/2e121ec7/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
index f21de3e..7e9e9ab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestContainerReuse.java
@@ -96,6 +96,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -1275,6 +1276,35 @@ public class TestContainerReuse {
     verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta211), any(Object.class), eq(container2));
     eventHandler.reset();
 
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta211, container2.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta211, true, null, null);
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container2.getId()));
+    eventHandler.reset();
+
+
+    // Setup DAG3 with a subset of resources from DAG2 (container should be reused).
+    TezDAGID dagID3 = TezDAGID.getInstance("0", 3, 0);
+    dagIDAnswer.setDAGID(dagID3);
+
+    Map<String, LocalResource> v31LR = Collections.singletonMap(rsrc1, lr1);
+
+    // dag3, vertex1, task1, ta1
+    TezTaskAttemptID taID311 = TezTaskAttemptID.getInstance(
+        TezTaskID.getInstance(TezVertexID.getInstance(dagID3, 1), 1), 1);
+    TaskAttempt ta311 = mock(TaskAttempt.class);
+    doReturn(taID311).when(ta311).getID();
+    doReturn("Mock for TA " + taID311).when(ta311).toString();
+    AMSchedulerEventTALaunchRequest lrEvent31 = createLaunchRequestEvent(taID311, ta311, resource1,
+        host1, racks, priority1, v31LR);
+
+    taskSchedulerManager.handleEvent(lrEvent31);
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta311), any(Object.class), eq(container2));
+    eventHandler.reset();
+
     taskScheduler.shutdown();
     taskSchedulerManager.close();
   }
@@ -1348,6 +1378,289 @@ public class TestContainerReuse {
     taskSchedulerManager.close();
   }
 
+  @Test(timeout=5000)
+  public void testDifferentResourceContainerReuse() throws Exception {
+    Configuration tezConf = new Configuration(new YarnConfiguration());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
+
+    CapturingEventHandler eventHandler = new CapturingEventHandler();
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+
+    AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
+    TezAMRMClientAsync<CookieContainerRequest> rmClient =
+        spy(new AMRMClientAsyncForTest(rmClientCore, 100));
+
+    AppContext appContext = mock(AppContext.class);
+    doReturn(new Configuration(false)).when(appContext).getAMConf();
+    AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
+        mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+    doReturn(dagID).when(appContext).getCurrentDAGID();
+    doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+    TaskSchedulerManager taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext,
+        eventHandler, rmClient, new AlwaysMatchesContainerMatcher(),
+        TezUtils.createUserPayloadFromConf(tezConf));
+    TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
+
+    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext)
+        ((TaskSchedulerManagerForTest) taskSchedulerManager).getSpyTaskScheduler();
+    TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+    Resource resource1 = Resource.newInstance(1024, 1);
+    Resource resource2 = Resource.newInstance(2048, 1);
+    String[] host1 = {"host1"};
+    String[] host2 = {"host2"};
+
+    String []racks = {"/default-rack"};
+    Priority priority1 = Priority.newInstance(1);
+
+    TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
+
+    //Vertex 1, Task 1, Attempt 1, host1
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1);
+    TaskAttempt ta11 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(
+        taID11, ta11, resource1, host1, racks, priority1);
+
+    //Vertex 1, Task 2, Attempt 1, host1
+    TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1);
+    TaskAttempt ta12 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent(
+        taID12, ta12, resource1, host1, racks, priority1);
+
+    //Vertex 1, Task 3, Attempt 1, host2
+    TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1);
+    TaskAttempt ta13 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent3 = createLaunchRequestEvent(
+        taID13, ta13, resource2, host2, racks, priority1);
+
+    //Vertex 1, Task 4, Attempt 1, host2
+    TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1);
+    TaskAttempt ta14 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(
+        taID14, ta14, resource2, host2, racks, priority1);
+
+    taskSchedulerManager.handleEvent(lrEvent1);
+    taskSchedulerManager.handleEvent(lrEvent2);
+    taskSchedulerManager.handleEvent(lrEvent3);
+    taskSchedulerManager.handleEvent(lrEvent4);
+
+    Container container1 = createContainer(1, "host1", resource1, priority1);
+    Container container2 = createContainer(2, "host2", resource2, priority1);
+
+    // One container allocated, should start ta11
+    taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
+
+    // Second container allocated, should start ta13
+    taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container2));
+
+    // ta11 finished, should start ta12
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container1));
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    // ta13 finished, should start ta14
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta13, container2.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta13, true, null, null);
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container2));
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container2.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    // ta12 finished no pending requests, should release container1
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta12, container1.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta12, true, null, null);
+    verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    // ta14 finished no pending requests, should release container2.
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta14, container2.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta14, true, null, null);
+    verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
+    eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    taskScheduler.shutdown();
+    taskSchedulerManager.close();
+  }
+
+  @Test(timeout=5000)
+  public void testEnvironmentVarsContainerReuse() throws Exception {
+    Configuration tezConf = new Configuration(new YarnConfiguration());
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, true);
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED, true);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
+    tezConf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
+
+    CapturingEventHandler eventHandler = new CapturingEventHandler();
+    TezDAGID dagID = TezDAGID.getInstance("0", 0, 0);
+
+    AMRMClient<CookieContainerRequest> rmClientCore = new AMRMClientForTest();
+    TezAMRMClientAsync<CookieContainerRequest> rmClient =
+        spy(new AMRMClientAsyncForTest(rmClientCore, 100));
+
+    AppContext appContext = mock(AppContext.class);
+    doReturn(new Configuration(false)).when(appContext).getAMConf();
+    AMContainerMap amContainerMap = new AMContainerMap(mock(ContainerHeartbeatHandler.class),
+        mock(TaskCommunicatorManagerInterface.class), new ContainerContextMatcher(), appContext);
+    AMNodeTracker amNodeTracker = new AMNodeTracker(eventHandler, appContext);
+    doReturn(amContainerMap).when(appContext).getAllContainers();
+    doReturn(amNodeTracker).when(appContext).getNodeTracker();
+    doReturn(DAGAppMasterState.RUNNING).when(appContext).getAMState();
+    doReturn(dagID).when(appContext).getCurrentDAGID();
+    doReturn(mock(ClusterInfo.class)).when(appContext).getClusterInfo();
+
+    TaskSchedulerManager taskSchedulerManagerReal = new TaskSchedulerManagerForTest(appContext,
+        eventHandler, rmClient, new ContainerContextMatcher(),
+        TezUtils.createUserPayloadFromConf(tezConf));
+    TaskSchedulerManager taskSchedulerManager = spy(taskSchedulerManagerReal);
+    taskSchedulerManager.init(tezConf);
+    taskSchedulerManager.start();
+
+    TaskSchedulerWithDrainableContext taskScheduler = (TaskSchedulerWithDrainableContext)
+        ((TaskSchedulerManagerForTest) taskSchedulerManager).getSpyTaskScheduler();
+    TaskSchedulerContextDrainable drainableAppCallback = taskScheduler.getDrainableAppCallback();
+    AtomicBoolean drainNotifier = new AtomicBoolean(false);
+    taskScheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
+
+    Resource resource1 = Resource.newInstance(1024, 1);
+    String[] host1 = {"host1"};
+
+    String []racks = {"/default-rack"};
+    Priority priority1 = Priority.newInstance(1);
+
+    TezVertexID vertexID1 = TezVertexID.getInstance(dagID, 1);
+
+    // Create containers with same resources and then see how tasks are allocated
+    // when the environments vary
+    Map<String, String> env1 = ImmutableMap.of("env1", "val1", "env2", "val2");
+    Map<String, String> env2 = ImmutableMap.of("env3", "val3", "env4", "val4");
+    Map<String, String> env3 = ImmutableMap.of("env1", "val1");
+    Map<String, String> env4 = ImmutableMap.of("env1", "val1", "env4", "val4");
+
+    //Vertex 1, Task 1, Attempt 1, host1, r1=lr1
+    TezTaskAttemptID taID11 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 1), 1);
+    TaskAttempt ta11 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent1 = createLaunchRequestEvent(
+        taID11, ta11, resource1, host1, racks, priority1, "", env1);
+
+    //Vertex 1, Task 2, Attempt 1, host1, r1=lr2
+    TezTaskAttemptID taID12 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 2), 1);
+    TaskAttempt ta12 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent2 = createLaunchRequestEvent(
+        taID12, ta12, resource1, host1, racks, priority1, "", env2);
+
+    //Vertex 1, Task 3, Attempt 1, host1, r1=lr1 & r2=lr2
+    TezTaskAttemptID taID13 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 3), 1);
+    TaskAttempt ta13 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent3 = createLaunchRequestEvent(
+        taID13, ta13, resource1, host1, racks, priority1, "", env3);
+
+    //Vertex 1, Task 4, Attempt 1, host1, r2=lr2
+    TezTaskAttemptID taID14 = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexID1, 4), 1);
+    TaskAttempt ta14 = mock(TaskAttempt.class);
+    AMSchedulerEventTALaunchRequest lrEvent4 = createLaunchRequestEvent(
+        taID14, ta14, resource1, host1, racks, priority1, "", env4);
+
+    taskSchedulerManager.handleEvent(lrEvent1);
+    taskSchedulerManager.handleEvent(lrEvent2);
+    taskSchedulerManager.handleEvent(lrEvent3);
+    taskSchedulerManager.handleEvent(lrEvent4);
+
+    Container container1 = createContainer(1, "host1", resource1, priority1);
+    Container container2 = createContainer(2, "host1", resource1, priority1);
+    Container container3 = createContainer(3, "host1", resource1, priority1);
+
+    // One container allocated, should start ta11 on container1
+    taskScheduler.onContainersAllocated(Collections.singletonList(container1));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta11), any(Object.class), eq(container1));
+
+    // finish ta11, should start ta13
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta11, container1.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta11, true, null, null);
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta13), any(Object.class), eq(container1));
+    verify(rmClient, times(0)).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyNoInvocations(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    // ta13 finished, cannot reuse container1, should release container1
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta13, container1.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta13, true, null, null);
+    verify(rmClient).releaseAssignedContainer(eq(container1.getId()));
+    eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    // Second container allocated, should start ta12.
+    taskScheduler.onContainersAllocated(Collections.singletonList(container2));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta12), any(Object.class), eq(container2));
+
+    // ta12 finished, cannot reuse container, should release container2
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta12, container2.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta12, true, null, null);
+    verify(rmClient).releaseAssignedContainer(eq(container2.getId()));
+    eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    // Third container allocated, should start ta14.
+    taskScheduler.onContainersAllocated(Collections.singletonList(container3));
+    TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
+    drainableAppCallback.drain();
+    verify(taskSchedulerManager).taskAllocated(eq(0), eq(ta14), any(Object.class), eq(container3));
+
+    // ta14 finished, should release container3
+    taskSchedulerManager.handleEvent(new AMSchedulerEventTAEnded(ta14, container3.getId(),
+        TaskAttemptState.SUCCEEDED, null, null, 0));
+    drainableAppCallback.drain();
+    verifyDeAllocateTask(taskScheduler, ta14, true, null, null);
+    verify(rmClient).releaseAssignedContainer(eq(container3.getId()));
+    eventHandler.verifyInvocation(AMContainerEventStopRequest.class);
+    eventHandler.reset();
+
+    taskScheduler.shutdown();
+    taskSchedulerManager.close();
+  }
+
   private Container createContainer(int id, String host, Resource resource, Priority priority) {
     @SuppressWarnings("deprecation")
     ContainerId containerID = ContainerId.newInstance(
@@ -1400,6 +1713,14 @@ public class TestContainerReuse {
         new ContainerContext(localResources, new Credentials(), new HashMap<String, String>(), jvmOpts));
   }
 
+  private AMSchedulerEventTALaunchRequest createLaunchRequestEvent(TezTaskAttemptID taID,
+      TaskAttempt ta, Resource capability, String[] hosts, String[] racks, Priority priority,
+      String jvmOpts, Map<String, String> environment) {
+    return createLaunchRequestEvent(taID, ta, capability, hosts, racks, priority,
+        new ContainerContext(new HashMap<String, LocalResource>(), new Credentials(), environment,
+            jvmOpts));
+  }
+
   private static class ChangingDAGIDAnswer implements Answer<TezDAGID> {
 
     private TezDAGID dagID;