You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by zh...@apache.org on 2017/08/25 23:50:24 UTC
tez git commit: TEZ-3431. Add unit tests for container release
(Taklon Stephen Wu via zhiyuany)
Repository: tez
Updated Branches:
refs/heads/master 823b1bb3b -> b04e7fce7
TEZ-3431. Add unit tests for container release (Taklon Stephen Wu via zhiyuany)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b04e7fce
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b04e7fce
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b04e7fce
Branch: refs/heads/master
Commit: b04e7fce7ff61bb31b06919da7298aa3a04e1c5f
Parents: 823b1bb
Author: Zhiyuan Yang <zh...@apache.org>
Authored: Fri Aug 25 16:47:01 2017 -0700
Committer: Zhiyuan Yang <zh...@apache.org>
Committed: Fri Aug 25 16:47:01 2017 -0700
----------------------------------------------------------------------
.../tez/dag/app/rm/TestTaskScheduler.java | 123 ++++++++++---------
1 file changed, 66 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b04e7fce/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 16c560e..1a647b1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -89,6 +89,10 @@ public class TestTaskScheduler {
static ContainerSignatureMatcher containerSignatureMatcher = new AlwaysMatchesContainerMatcher();
private ExecutorService contextCallbackExecutor;
+ private static final String DEFAULT_APP_HOST = "host";
+ private static final String DEFAULT_APP_URL = "url";
+ private static final String SUCCEED_APP_MESSAGE = "success";
+ private static final int DEFAULT_APP_PORT = 0;
@BeforeClass
public static void beforeClass() {
@@ -122,16 +126,12 @@ public class TestTaskScheduler {
AMRMClientAsyncForTest mockRMClient = spy(
new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
- String appHost = "host";
- int appPort = 0;
- String appUrl = "url";
-
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
int interval = 100;
conf.setInt(TezConfiguration.TEZ_AM_RM_HEARTBEAT_INTERVAL_MS_MAX, interval);
- TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf);
TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
TaskSchedulerWithDrainableContext scheduler =
@@ -146,7 +146,7 @@ public class TestTaskScheduler {
scheduler.start();
drainableAppCallback.drain();
verify(mockRMClient).start();
- verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
+ verify(mockRMClient).registerApplicationMaster(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL);
RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
regResponse.getApplicationACLs(),
@@ -372,23 +372,19 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
verify(mockApp).appShutdownRequested();
- String appMsg = "success";
AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
scheduler.shutdown();
drainableAppCallback.drain();
verify(mockRMClient).
unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
- appMsg, appUrl);
+ SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
verify(mockRMClient).stop();
}
@Test(timeout=10000)
public void testTaskSchedulerInitiateStop() throws Exception {
- String appHost = "host";
- int appPort = 0;
- String appUrl = "url";
Configuration conf = new Configuration();
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
@@ -396,7 +392,7 @@ public class TestTaskScheduler {
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 10000);
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 10000);
- TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf);
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
@@ -506,10 +502,6 @@ public class TestTaskScheduler {
TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
- String appHost = "host";
- int appPort = 0;
- String appUrl = "url";
-
Configuration conf = new Configuration();
// to match all in the same pass
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
@@ -517,7 +509,7 @@ public class TestTaskScheduler {
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
- TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf);
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
TaskSchedulerWithDrainableContext scheduler =
@@ -795,15 +787,14 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
verify(mockApp).appShutdownRequested();
- String appMsg = "success";
AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
scheduler.shutdown();
drainableAppCallback.drain();
verify(mockRMClient).
unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
- appMsg, appUrl);
+ SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
verify(mockRMClient).stop();
}
@@ -812,12 +803,8 @@ public class TestTaskScheduler {
TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
- String appHost = "host";
- int appPort = 0;
- String appUrl = "url";
-
- TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, true,
- new Configuration());
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL,
+ true, new Configuration());
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
TaskSchedulerWithDrainableContext scheduler =
@@ -959,9 +946,48 @@ public class TestTaskScheduler {
verify(mockRMClient, times(2)).releaseAssignedContainer((ContainerId)any());
Assert.assertEquals(5, scheduler.heldContainers.size());
- String appMsg = "success";
AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
+ when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
+ scheduler.shutdown();
+ }
+
+ @Test (timeout=3000)
+ public void testTaskSchedulerHeldContainersReleaseAfterExpired() throws Exception {
+ final TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+ new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
+ final TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT,
+ DEFAULT_APP_URL, true, new Configuration());
+ final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
+ final TaskSchedulerWithDrainableContext scheduler =
+ new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
+
+ scheduler.initialize();
+ scheduler.start();
+
+ ApplicationAttemptId appId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0);
+ ContainerId containerId = ContainerId.newInstance(appId, 0);
+ Container c1 = mock(Container.class, RETURNS_DEEP_STUBS);
+ when(c1.getNodeId().getHost()).thenReturn(""); // we are mocking directly
+
+ HeldContainer hc1 = mock(HeldContainer.class);
+ when(c1.getId()).thenReturn(containerId);
+ when(hc1.getContainer()).thenReturn(c1);
+ when(hc1.isNew()).thenReturn(false);
+
+ // containerExpiryTime = 0
+ scheduler.heldContainers.put(containerId, hc1);
+
+ long currTime = System.currentTimeMillis();
+ scheduler.delayedContainerManager.addDelayedContainer(hc1.getContainer(), currTime);
+ // sleep and wait for mainLoop() check-in to release this expired held container.
+ Thread.sleep(1000);
+
+ verify(mockRMClient, times(1)).releaseAssignedContainer((ContainerId)any());
+ Assert.assertEquals(0, scheduler.heldContainers.size());
+
+ AppFinalStatus finalStatus =
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
scheduler.shutdown();
}
@@ -971,10 +997,6 @@ public class TestTaskScheduler {
TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
- String appHost = "host";
- int appPort = 0;
- String appUrl = "url";
-
long minTime = 1000l;
long maxTime = 100000l;
Configuration conf1 = new Configuration();
@@ -985,8 +1007,8 @@ public class TestTaskScheduler {
conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, minTime);
conf2.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, maxTime);
- TaskSchedulerContext mockApp1 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf1);
- TaskSchedulerContext mockApp2 = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf2);
+ TaskSchedulerContext mockApp1 = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf1);
+ TaskSchedulerContext mockApp2 = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf2);
final TaskSchedulerContextDrainable drainableAppCallback1 = createDrainableContext(mockApp1);
final TaskSchedulerContextDrainable drainableAppCallback2 = createDrainableContext(mockApp2);
@@ -1018,9 +1040,8 @@ public class TestTaskScheduler {
lastExpireTime = currExpireTime;
}
- String appMsg = "success";
AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, appMsg, appUrl);
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, SUCCEED_APP_MESSAGE, DEFAULT_APP_URL);
when(mockApp1.getFinalAppStatus()).thenReturn(finalStatus);
when(mockApp2.getFinalAppStatus()).thenReturn(finalStatus);
scheduler1.shutdown();
@@ -1033,16 +1054,12 @@ public class TestTaskScheduler {
TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
mock(TezAMRMClientAsync.class);
- String appHost = "host";
- int appPort = 0;
- String appUrl = "url";
-
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 3);
- TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false,
- null, null, new PreemptionMatcher(), conf);
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL,
+ false, null, null, new PreemptionMatcher(), conf);
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
final TaskSchedulerWithDrainableContext scheduler =
@@ -1331,7 +1348,7 @@ public class TestTaskScheduler {
verify(mockRMClient, times(1)).releaseAssignedContainer(mockCId3A);
AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
scheduler.shutdown();
drainableAppCallback.drain();
@@ -1342,10 +1359,6 @@ public class TestTaskScheduler {
TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
- String appHost = "host";
- int appPort = 0;
- String appUrl = "url";
-
int waitTime = 1000;
Configuration conf = new Configuration();
@@ -1353,8 +1366,8 @@ public class TestTaskScheduler {
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_HEARTBEATS_BETWEEN_PREEMPTIONS, 2);
conf.setInt(TezConfiguration.TEZ_AM_PREEMPTION_MAX_WAIT_TIME_MS, waitTime);
- TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, false,
- null, null, new PreemptionMatcher(), conf);
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL,
+ false, null, null, new PreemptionMatcher(), conf);
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
final TaskSchedulerWithDrainableContext scheduler =
@@ -1491,7 +1504,7 @@ public class TestTaskScheduler {
Assert.assertTrue(oldStartWaitTime < scheduler.highestWaitingRequestWaitStartTime);
AppFinalStatus finalStatus =
- new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", appUrl);
+ new AppFinalStatus(FinalApplicationStatus.SUCCEEDED, "", DEFAULT_APP_URL);
when(mockApp.getFinalAppStatus()).thenReturn(finalStatus);
scheduler.shutdown();
drainableAppCallback.drain();
@@ -1505,7 +1518,7 @@ public class TestTaskScheduler {
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
- TaskSchedulerContext appClient = setupMockTaskSchedulerContext("host", 0, "", conf);
+ TaskSchedulerContext appClient = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, "", conf);
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(appClient);
TaskSchedulerWithDrainableContext taskScheduler =
@@ -1611,10 +1624,6 @@ public class TestTaskScheduler {
TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
- String appHost = "host";
- int appPort = 0;
- String appUrl = "url";
-
Configuration conf = new Configuration();
// to match all in the same pass
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS, 0);
@@ -1622,7 +1631,7 @@ public class TestTaskScheduler {
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MIN_MILLIS, 0);
conf.setLong(TezConfiguration.TEZ_AM_CONTAINER_IDLE_RELEASE_TIMEOUT_MAX_MILLIS, 0);
- TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
+ TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(DEFAULT_APP_HOST, DEFAULT_APP_PORT, DEFAULT_APP_URL, conf);
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
TaskSchedulerWithDrainableContext scheduler =
new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);