You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/08/25 23:50:24 UTC

tez git commit: TEZ-3431. Add unit tests for container release (Taklon Stephen Wu via zhiyuany)

Repository: tez
Updated Branches:
  refs/heads/master 823b1bb3b -> b04e7fce7


TEZ-3431. Add unit tests for container release (Taklon Stephen Wu via zhiyuany)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b04e7fce
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b04e7fce
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b04e7fce

Branch: refs/heads/master
Commit: b04e7fce7ff61bb31b06919da7298aa3a04e1c5f
Parents: 823b1bb
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Fri Aug 25 16:47:01 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Fri Aug 25 16:47:01 2017 -0700

----------------------------------------------------------------------
 .../tez/dag/app/rm/TestTaskScheduler.java       | 123 ++++++++++---------
 1 file changed, 66 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/b04e7fce/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 16c560e..1a647b1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -89,6 +89,10 @@ public class TestTaskScheduler {
 
   static ContainerSignatureMatcher containerSignatureMatcher = new AlwaysMatchesContainerMatcher();
   private ExecutorService contextCallbackExecutor;
+  private static final String DEFAULT_APP_HOST = "host";
+  private static final String DEFAULT_APP_URL = "url";
+  private static final String SUCCEED_APP_MESSAGE = "success";
+  private static final int DEFAULT_APP_PORT = 0;
 
   @BeforeClass
   public static void beforeClass() {
@@ -122,16 +126,12 @@ public class TestTaskScheduler {
     AMRMClientAsyncForTest mockRMClient = spy(
         new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
-    String appHost = "host";
-    int appPort = 0;
-    String appUrl = "url";
-
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     int interval = 100;
     conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, interval);
 
-    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf);
     TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
 
     TaskSchedulerWithDrainableContext scheduler =
@@ -146,7 +146,7 @@ public class TestTaskScheduler {
     scheduler.start();
     drainableAppCallback.drain();
     verify(mockRMClient).start();
-    verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+    verify(mockRMClient).registerApplicationMaster(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL);
     RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
     verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
                                                    regResponse.getApplicationACLs(),
@@ -372,23 +372,19 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
     verify(mockApp).appShutdownRequested();
 
-    String appMsg = "success";
     AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
     scheduler.shutdown();
     drainableAppCallback.drain();
     verify(mockRMClient).
                   unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-                      appMsg, appUrl);
+                    SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
     verify(mockRMClient).stop();
   }
 
   @Test(timeout=10000)
   public void testTaskSchedulerInitiateStop() throws Exception {
-    String appHost = "host";
-    int appPort = 0;
-    String appUrl = "url";
 
     Configuration conf = new Configuration();
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
@@ -396,7 +392,7 @@ public class TestTaskScheduler {
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 10000);
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 10000);
 
-    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf);
     final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
 
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
@@ -506,10 +502,6 @@ public class TestTaskScheduler {
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
         new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
-    String appHost = "host";
-    int appPort = 0;
-    String appUrl = "url";
-
     Configuration conf = new Configuration();
     // to match all in the same pass
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
@@ -517,7 +509,7 @@ public class TestTaskScheduler {
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
 
-    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf);
     final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
 
     TaskSchedulerWithDrainableContext scheduler =
@@ -795,15 +787,14 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
     verify(mockApp).appShutdownRequested();
 
-    String appMsg = "success";
     AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
     scheduler.shutdown();
     drainableAppCallback.drain();
     verify(mockRMClient).
                   unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
-                      appMsg, appUrl);
+                    SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
     verify(mockRMClient).stop();
   }
   
@@ -812,12 +803,8 @@ public class TestTaskScheduler {
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
         new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
-    String appHost = "host";
-    int appPort = 0;
-    String appUrl = "url";
-
-    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, true,
-        new Configuration());
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL,
+      true, new Configuration());
     final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
 
     TaskSchedulerWithDrainableContext scheduler =
@@ -959,9 +946,48 @@ public class TestTaskScheduler {
     verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
     Assert.assertEquals(5, scheduler.heldContainers.size());
 
-    String appMsg = "success";
     AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
+    when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+    scheduler.shutdown();
+  }
+
+  @Test (timeout=3000)
+  public void testTaskSchedulerHeldContainersReleaseAfterExpired() throws Exception {
+    final TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+      new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
+    final TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT,
+      DEFAULT_APP_URL, true, new Configuration());
+    final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+    final TaskSchedulerWithDrainableContext scheduler =
+      new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+    scheduler.initialize();
+    scheduler.start();
+
+    ApplicationAttemptId appId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0);
+    ContainerId containerId = ContainerId.newInstance(appId, 0);
+    Container c1 = mock(Container.class, RETURNS_DEEP_STUBS);
+    when(c1.getNodeId().getHost()).thenReturn(""); // we are mocking directly
+    
+    HeldContainer hc1 = mock(HeldContainer.class);
+    when(c1.getId()).thenReturn(containerId);
+    when(hc1.getContainer()).thenReturn(c1);
+    when(hc1.isNew()).thenReturn(false);
+
+    // containerExpiryTime = 0
+    scheduler.heldContainers.put(containerId, hc1);
+
+    long currTime = System.currentTimeMillis();
+    scheduler.delayedContainerManager.addDelayedContainer(hc1.getContainer(), currTime);
+    // sleep and wait for mainLoop() check-in to release this expired held container.
+    Thread.sleep(1000);
+
+    verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+    Assert.assertEquals(0, scheduler.heldContainers.size());
+
+    AppFinalStatus finalStatus =
+      new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
     scheduler.shutdown();
   }
@@ -971,10 +997,6 @@ public class TestTaskScheduler {
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
         new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
-    String appHost = "host";
-    int appPort = 0;
-    String appUrl = "url";
-
     long minTime = 1000l;
     long maxTime = 100000l;
     Configuration conf1 = new Configuration();
@@ -985,8 +1007,8 @@ public class TestTaskScheduler {
     conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime);
     conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, maxTime);
 
-    TaskSchedulerContext mockApp1 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf1);
-    TaskSchedulerContext mockApp2 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf2);
+    TaskSchedulerContext mockApp1 = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf1);
+    TaskSchedulerContext mockApp2 = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf2);
     final TaskSchedulerContextDrainable drainableAppCallback1 = createDrainableContext(mockApp1);
     final TaskSchedulerContextDrainable drainableAppCallback2 = createDrainableContext(mockApp2);
 
@@ -1018,9 +1040,8 @@ public class TestTaskScheduler {
       lastExpireTime = currExpireTime;
     }
 
-    String appMsg = "success";
     AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
     when(mockApp1.getFinalAppStatus()).thenReturn(finalStatus);
     when(mockApp2.getFinalAppStatus()).thenReturn(finalStatus);
     scheduler1.shutdown();
@@ -1033,16 +1054,12 @@ public class TestTaskScheduler {
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(TezAMRMClientAsync.class);
 
-    String appHost = "host";
-    int appPort = 0;
-    String appUrl = "url";
-
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
     conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 3);
 
-    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false,
-        null, null, new PreemptionMatcher(), conf);
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL,
+      false, null, null, new PreemptionMatcher(), conf);
     final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
 
     final TaskSchedulerWithDrainableContext scheduler =
@@ -1331,7 +1348,7 @@ public class TestTaskScheduler {
     verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3A);
 
     AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
     scheduler.shutdown();
     drainableAppCallback.drain();
@@ -1342,10 +1359,6 @@ public class TestTaskScheduler {
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
         new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
-    String appHost = "host";
-    int appPort = 0;
-    String appUrl = "url";
-
     int waitTime = 1000;
     
     Configuration conf = new Configuration();
@@ -1353,8 +1366,8 @@ public class TestTaskScheduler {
     conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 2);
     conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS, waitTime);
 
-    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false,
-        null, null, new PreemptionMatcher(), conf);
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL,
+      false, null, null, new PreemptionMatcher(), conf);
     final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
 
     final TaskSchedulerWithDrainableContext scheduler =
@@ -1491,7 +1504,7 @@ public class TestTaskScheduler {
     Assert.assertTrue(oldStartWaitTime < scheduler.highestWaitingRequestWaitStartTime);
 
     AppFinalStatus finalStatus =
-        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
+        new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL);
     when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
     scheduler.shutdown();
     drainableAppCallback.drain();
@@ -1505,7 +1518,7 @@ public class TestTaskScheduler {
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
 
-    TaskSchedulerContext appClient = setupMockTaskSchedulerContext("host", 0, "", conf);
+    TaskSchedulerContext appClient = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, "", conf);
     final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(appClient);
 
     TaskSchedulerWithDrainableContext taskScheduler =
@@ -1611,10 +1624,6 @@ public class TestTaskScheduler {
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
         new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
-    String appHost = "host";
-    int appPort = 0;
-    String appUrl = "url";
-
     Configuration conf = new Configuration();
     // to match all in the same pass
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
@@ -1622,7 +1631,7 @@ public class TestTaskScheduler {
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
     conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
 
-    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+    TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf);
     final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
     TaskSchedulerWithDrainableContext scheduler =
         new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);