You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/11/18 21:55:18 UTC
tez git commit: TEZ-3508. TestTaskScheduler cleanup. (jlowe)
Repository: tez
Updated Branches:
refs/heads/master b71ea4aab -> 501a351d5
TEZ-3508. TestTaskScheduler cleanup. (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/501a351d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/501a351d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/501a351d
Branch: refs/heads/master
Commit: 501a351d59d6bafbd3d4605785492d81a574574b
Parents: b71ea4a
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Nov 18 21:53:39 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Nov 18 21:53:39 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/dag/app/rm/TestTaskScheduler.java | 689 +------------------
.../dag/app/rm/TestTaskSchedulerHelpers.java | 12 +-
3 files changed, 43 insertions(+), 659 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/501a351d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cac97f9..f13511d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3508. TestTaskScheduler cleanup.
TEZ-3536. NPE in WebUIService start when host resolution fails.
TEZ-3269. Provide basic fair routing and scheduling functionality via custom VertexManager and EdgeManager.
TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code.
http://git-wip-us.apache.org/repos/asf/tez/blob/501a351d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 5c8daeb..b3511e8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -29,32 +29,23 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.commons.io.IOExceptionWithCause;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -66,17 +57,17 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
@@ -91,6 +82,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
@SuppressWarnings("deprecation")
public class TestTaskScheduler {
@@ -127,10 +119,8 @@ public class TestTaskScheduler {
@SuppressWarnings({ "unchecked" })
@Test(timeout=10000)
public void testTaskSchedulerNoReuse() throws Exception {
- RackResolver.init(new YarnConfiguration());
-
- TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
- mock(TezAMRMClientAsync.class);
+ AMRMClientAsyncForTest mockRMClient = spy(
+ new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
String appHost = "host";
int appPort = 0;
@@ -153,33 +143,16 @@ public class TestTaskScheduler {
// it's the same instance.
verify(mockRMClient).setHeartbeatInterval(interval);
- RegisterApplicationMasterResponse mockRegResponse =
- mock(RegisterApplicationMasterResponse.class);
- Resource mockMaxResource = mock(Resource.class);
- Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
- when(mockRegResponse.getMaximumResourceCapability()).
- thenReturn(mockMaxResource);
- when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
- ByteBuffer mockKey = mock(ByteBuffer.class);
- when(mockRegResponse.getClientToAMTokenMasterKey()).thenReturn(mockKey);
- when(mockRMClient.
- registerApplicationMaster(anyString(), anyInt(), anyString())).
- thenReturn(mockRegResponse);
scheduler.start();
drainableAppCallback.drain();
verify(mockRMClient).start();
verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
- verify(mockApp).setApplicationRegistrationData(mockMaxResource,
- mockAcls, mockKey);
-
- when(mockRMClient.getClusterNodeCount()).thenReturn(5);
- Assert.assertEquals(5, scheduler.getClusterNodeCount());
+ RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+ verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+ regResponse.getApplicationACLs(),
+ regResponse.getClientToAMTokenMasterKey());
- Resource mockClusterResource = mock(Resource.class);
- when(mockRMClient.getAvailableResources()).
- thenReturn(mockClusterResource);
- Assert.assertEquals(mockClusterResource,
- mockRMClient.getAvailableResources());
+ Assert.assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
Object mockTask1 = mock(Object.class);
Object mockCookie1 = mock(Object.class);
@@ -257,84 +230,6 @@ public class TestTaskScheduler {
ContainerId mockCId4 = mock(ContainerId.class);
when(mockContainer4.getId()).thenReturn(mockCId4);
containers.add(mockContainer4);
- ArrayList<CookieContainerRequest> hostContainers =
- new ArrayList<CookieContainerRequest>();
- hostContainers.add(request1);
- hostContainers.add(request2);
- hostContainers.add(request3);
- ArrayList<CookieContainerRequest> rackContainers =
- new ArrayList<CookieContainerRequest>();
- rackContainers.add(request2);
- rackContainers.add(request3);
- ArrayList<CookieContainerRequest> anyContainers =
- new ArrayList<CookieContainerRequest>();
- anyContainers.add(request3);
-
- final List<ArrayList<CookieContainerRequest>> hostList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- hostList.add(hostContainers);
- final List<ArrayList<CookieContainerRequest>> rackList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- rackList.add(rackContainers);
- final List<ArrayList<CookieContainerRequest>> anyList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- anyList.add(anyContainers);
- final List<ArrayList<CookieContainerRequest>> emptyList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- // return all requests for host1
- when(
- mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return hostList;
- }
-
- });
- // first request matched by host
- // second request matched to rack. RackResolver by default puts hosts in
- // /default-rack. We need to workaround by returning rack matches only once
- when(
- mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return rackList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
- // third request matched to ANY
- when(
- mockRMClient.getMatchingRequests((Priority) any(),
- eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return anyList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
scheduler.onContainersAllocated(containers);
drainableAppCallback.drain();
// first container allocated
@@ -398,9 +293,6 @@ public class TestTaskScheduler {
null, mockPriority, null, mockCookie4);
drainableAppCallback.drain();
verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture());
- CookieContainerRequest request4 = requestCaptor.getValue();
- anyContainers.clear();
- anyContainers.add(request4);
Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
when(mockContainer5.getNodeId()).thenReturn(badNodeId);
@@ -408,25 +300,6 @@ public class TestTaskScheduler {
when(mockContainer5.getId()).thenReturn(mockCId5);
containers.clear();
containers.add(mockContainer5);
- when(
- mockRMClient.getMatchingRequests((Priority) any(),
- eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return anyList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
scheduler.onContainersAllocated(containers);
drainableAppCallback.drain();
// no new allocation
@@ -436,34 +309,12 @@ public class TestTaskScheduler {
verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any());
// verify request added back
verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
- CookieContainerRequest request5 = requestCaptor.getValue();
- anyContainers.clear();
- anyContainers.add(request5);
Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
ContainerId mockCId6 = mock(ContainerId.class);
when(mockContainer6.getId()).thenReturn(mockCId6);
containers.clear();
containers.add(mockContainer6);
- when(
- mockRMClient.getMatchingRequests((Priority) any(),
- eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return anyList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
scheduler.onContainersAllocated(containers);
drainableAppCallback.drain();
// new allocation
@@ -532,11 +383,8 @@ public class TestTaskScheduler {
verify(mockRMClient).stop();
}
- @SuppressWarnings({ "unchecked" })
@Test(timeout=10000)
public void testTaskSchedulerInitiateStop() throws Exception {
- RackResolver.init(new YarnConfiguration());
-
String appHost = "host";
int appPort = 0;
String appUrl = "url";
@@ -550,8 +398,8 @@ public class TestTaskScheduler {
TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
- TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
- mock(TezAMRMClientAsync.class);
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+ new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
TaskSchedulerWithDrainableContext scheduler =
new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
@@ -559,20 +407,6 @@ public class TestTaskScheduler {
scheduler.initialize();
drainableAppCallback.drain();
- RegisterApplicationMasterResponse mockRegResponse =
- mock(RegisterApplicationMasterResponse.class);
- Resource mockMaxResource = mock(Resource.class);
- Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
- when(mockRegResponse.getMaximumResourceCapability()).
- thenReturn(mockMaxResource);
- when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
- when(mockRMClient.
- registerApplicationMaster(anyString(), anyInt(), anyString())).
- thenReturn(mockRegResponse);
- Resource mockClusterResource = mock(Resource.class);
- when(mockRMClient.getAvailableResources()).
- thenReturn(mockClusterResource);
-
scheduler.start();
drainableAppCallback.drain();
@@ -585,8 +419,6 @@ public class TestTaskScheduler {
final Priority mockPriority1 = Priority.newInstance(1);
final Priority mockPriority2 = Priority.newInstance(2);
final Priority mockPriority3 = Priority.newInstance(3);
- final Priority mockPriority4 = Priority.newInstance(4);
- final Priority mockPriority5 = Priority.newInstance(5);
Object mockTask2 = mock(Object.class);
when(mockTask2.toString()).thenReturn("task2");
Object mockCookie2 = mock(Object.class);
@@ -644,93 +476,6 @@ public class TestTaskScheduler {
new ArrayList<CookieContainerRequest>();
anyContainers.add(request3);
- final List<ArrayList<CookieContainerRequest>> hostList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- hostList.add(hostContainers);
- final List<ArrayList<CookieContainerRequest>> rackList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- rackList.add(rackContainers);
- final List<ArrayList<CookieContainerRequest>> anyList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- anyList.add(anyContainers);
- final List<ArrayList<CookieContainerRequest>> emptyList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- // return pri1 requests for host1
- when(
- mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return hostList;
- }
-
- });
- // second request matched to rack. RackResolver by default puts hosts in
- // /default-rack. We need to workaround by returning rack matches only once
- when(
- mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return rackList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
- // third request matched to ANY
- when(
- mockRMClient.getMatchingRequestsForTopPriority(
- eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return anyList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
-
- when(mockRMClient.getTopPriority()).then(
- new Answer<Priority>() {
- @Override
- public Priority answer(
- InvocationOnMock invocation) throws Throwable {
- int allocations = drainableAppCallback.count.get();
- if (allocations == 0) {
- return mockPriority1;
- }
- if (allocations == 1) {
- return mockPriority2;
- }
- if (allocations == 2) {
- return mockPriority3;
- }
- if (allocations == 3) {
- return mockPriority4;
- }
- return null;
- }
- });
-
AtomicBoolean drainNotifier = new AtomicBoolean(false);
scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -757,10 +502,8 @@ public class TestTaskScheduler {
@SuppressWarnings({ "unchecked" })
@Test(timeout=10000)
public void testTaskSchedulerWithReuse() throws Exception {
- RackResolver.init(new YarnConfiguration());
-
- TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
- mock(TezAMRMClientAsync.class);
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+ new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
String appHost = "host";
int appPort = 0;
@@ -782,21 +525,6 @@ public class TestTaskScheduler {
scheduler.initialize();
drainableAppCallback.drain();
-
- RegisterApplicationMasterResponse mockRegResponse =
- mock(RegisterApplicationMasterResponse.class);
- Resource mockMaxResource = mock(Resource.class);
- Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
- when(mockRegResponse.getMaximumResourceCapability()).
- thenReturn(mockMaxResource);
- when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
- when(mockRMClient.
- registerApplicationMaster(anyString(), anyInt(), anyString())).
- thenReturn(mockRegResponse);
- Resource mockClusterResource = mock(Resource.class);
- when(mockRMClient.getAvailableResources()).
- thenReturn(mockClusterResource);
-
scheduler.start();
drainableAppCallback.drain();
@@ -874,103 +602,6 @@ public class TestTaskScheduler {
when(mockCId3.toString()).thenReturn("container3");
containers.add(mockContainer3);
- ArrayList<CookieContainerRequest> hostContainers =
- new ArrayList<CookieContainerRequest>();
- hostContainers.add(request1);
- ArrayList<CookieContainerRequest> rackContainers =
- new ArrayList<CookieContainerRequest>();
- rackContainers.add(request2);
- ArrayList<CookieContainerRequest> anyContainers =
- new ArrayList<CookieContainerRequest>();
- anyContainers.add(request3);
-
- final List<ArrayList<CookieContainerRequest>> hostList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- hostList.add(hostContainers);
- final List<ArrayList<CookieContainerRequest>> rackList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- rackList.add(rackContainers);
- final List<ArrayList<CookieContainerRequest>> anyList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- anyList.add(anyContainers);
- final List<ArrayList<CookieContainerRequest>> emptyList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- // return pri1 requests for host1
- when(
- mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return hostList;
- }
-
- });
- // second request matched to rack. RackResolver by default puts hosts in
- // /default-rack. We need to workaround by returning rack matches only once
- when(
- mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return rackList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
- // third request matched to ANY
- when(
- mockRMClient.getMatchingRequestsForTopPriority(
- eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return anyList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
-
- when(mockRMClient.getTopPriority()).then(
- new Answer<Priority>() {
- @Override
- public Priority answer(
- InvocationOnMock invocation) throws Throwable {
- int allocations = drainableAppCallback.count.get();
- if (allocations == 0) {
- return mockPriority1;
- }
- if (allocations == 1) {
- return mockPriority2;
- }
- if (allocations == 2) {
- return mockPriority3;
- }
- if (allocations == 3) {
- return mockPriority4;
- }
- return null;
- }
- });
-
AtomicBoolean drainNotifier = new AtomicBoolean(false);
scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
@@ -1039,9 +670,6 @@ public class TestTaskScheduler {
null, mockPriority4, null, mockCookie4);
drainableAppCallback.drain();
verify(mockRMClient, times(4)).addContainerRequest(requestCaptor.capture());
- CookieContainerRequest request4 = requestCaptor.getValue();
- anyContainers.clear();
- anyContainers.add(request4);
Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
when(mockContainer5.getNodeId()).thenReturn(badNodeId);
@@ -1052,25 +680,6 @@ public class TestTaskScheduler {
when(mockContainer5.getPriority()).thenReturn(mockPriority4);
containers.clear();
containers.add(mockContainer5);
- when(
- mockRMClient.getMatchingRequestsForTopPriority(
- eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return anyList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
drainNotifier.set(false);
scheduler.onContainersAllocated(containers);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
@@ -1082,9 +691,6 @@ public class TestTaskScheduler {
verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any());
// verify request added back
verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture());
- CookieContainerRequest request5 = requestCaptor.getValue();
- anyContainers.clear();
- anyContainers.add(request5);
Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
ContainerId mockCId6 = mock(ContainerId.class);
@@ -1093,25 +699,6 @@ public class TestTaskScheduler {
when(mockCId6.toString()).thenReturn("container6");
containers.clear();
containers.add(mockContainer6);
- when(
- mockRMClient.getMatchingRequestsForTopPriority(
- eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return anyList;
- }
-
- }).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
-
- });
drainNotifier.set(false);
scheduler.onContainersAllocated(containers);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
@@ -1131,21 +718,6 @@ public class TestTaskScheduler {
assertEquals(0, scheduler.blacklistedNodes.size());
// verify container level matching
- // fudge the top level priority to prevent containers from being released
- // if top level priority is higher than newly allocated containers then
- // they will not be released
- final AtomicBoolean fudgePriority = new AtomicBoolean(true);
- when(mockRMClient.getTopPriority()).then(
- new Answer<Priority>() {
- @Override
- public Priority answer(
- InvocationOnMock invocation) throws Throwable {
- if (fudgePriority.get()) {
- return mockPriority4;
- }
- return mockPriority5;
- }
- });
// add a dummy task to prevent release of allocated containers
Object mockTask5 = mock(Object.class);
when(mockTask5.toString()).thenReturn("task5");
@@ -1153,7 +725,6 @@ public class TestTaskScheduler {
scheduler.allocateTask(mockTask5, mockCapability, hosts,
racks, mockPriority5, null, mockCookie5);
verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
- CookieContainerRequest request6 = requestCaptor.getValue();
drainableAppCallback.drain();
// add containers so that we can reference one of them for container specific
// allocation
@@ -1185,24 +756,6 @@ public class TestTaskScheduler {
scheduler.allocateTask(mockTask6, mockCapability, mockCId7, mockPriority5, null, mockCookie6);
drainableAppCallback.drain();
verify(mockRMClient, times(7)).addContainerRequest(requestCaptor.capture());
- CookieContainerRequest request7 = requestCaptor.getValue();
- hostContainers.clear();
- hostContainers.add(request6);
- hostContainers.add(request7);
-
- when(
- mockRMClient.getMatchingRequestsForTopPriority(eq("host5"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return hostList;
- }
-
- });
- // stop fudging top priority
- fudgePriority.set(false);
TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
drainableAppCallback.drain();
verify(mockApp, times(6)).taskAllocated(any(), any(), (Container) any());
@@ -1253,13 +806,10 @@ public class TestTaskScheduler {
verify(mockRMClient).stop();
}
- @SuppressWarnings("unchecked")
@Test (timeout=5000)
public void testTaskSchedulerDetermineMinHeldContainers() throws Exception {
- RackResolver.init(new YarnConfiguration());
-
- TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
- mock(TezAMRMClientAsync.class);
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+ new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
String appHost = "host";
int appPort = 0;
@@ -1273,18 +823,6 @@ public class TestTaskScheduler {
new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
scheduler.initialize();
- RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
- Resource mockMaxResource = mock(Resource.class);
- Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
- when(mockRegResponse.getMaximumResourceCapability()).thenReturn(
- mockMaxResource);
- when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
- when(
- mockRMClient.registerApplicationMaster(anyString(), anyInt(),
- anyString())).thenReturn(mockRegResponse);
- Resource mockClusterResource = mock(Resource.class);
- when(mockRMClient.getAvailableResources()).thenReturn(mockClusterResource);
-
scheduler.start();
String rack1 = "r1";
@@ -1427,13 +965,10 @@ public class TestTaskScheduler {
scheduler.shutdown();
}
- @SuppressWarnings("unchecked")
@Test(timeout=5000)
public void testTaskSchedulerRandomReuseExpireTime() throws Exception {
- RackResolver.init(new YarnConfiguration());
-
- TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
- mock(TezAMRMClientAsync.class);
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+ new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
String appHost = "host";
int appPort = 0;
@@ -1463,21 +998,6 @@ public class TestTaskScheduler {
scheduler1.initialize();
scheduler2.initialize();
-
- RegisterApplicationMasterResponse mockRegResponse =
- mock(RegisterApplicationMasterResponse.class);
- Resource mockMaxResource = mock(Resource.class);
- Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
- when(mockRegResponse.getMaximumResourceCapability()).
- thenReturn(mockMaxResource);
- when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
- when(mockRMClient.
- registerApplicationMaster(anyString(), anyInt(), anyString())).
- thenReturn(mockRegResponse);
- Resource mockClusterResource = mock(Resource.class);
- when(mockRMClient.getAvailableResources()).
- thenReturn(mockClusterResource);
-
scheduler1.start();
scheduler2.start();
@@ -1509,8 +1029,6 @@ public class TestTaskScheduler {
@SuppressWarnings({ "unchecked", "rawtypes" })
@Test (timeout=5000)
public void testTaskSchedulerPreemption() throws Exception {
- RackResolver.init(new YarnConfiguration());
-
TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
mock(TezAMRMClientAsync.class);
@@ -1818,13 +1336,10 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
@Test (timeout=5000)
public void testTaskSchedulerPreemption2() throws Exception {
- RackResolver.init(new YarnConfiguration());
-
- TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
- mock(TezAMRMClientAsync.class);
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+ new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
String appHost = "host";
int appPort = 0;
@@ -1845,20 +1360,12 @@ public class TestTaskScheduler {
new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
scheduler.initialize();
-
- RegisterApplicationMasterResponse mockRegResponse =
- mock(RegisterApplicationMasterResponse.class);
- when(
- mockRMClient.registerApplicationMaster(anyString(), anyInt(),
- anyString())).thenReturn(mockRegResponse);
-
scheduler.start();
- Resource totalResource = Resource.newInstance(4000, 4); // high value always reported
- when(mockRMClient.getAvailableResources()).thenReturn(totalResource);
// no preemption
scheduler.getProgress();
drainableAppCallback.drain();
+ Resource totalResource = mockRMClient.getAvailableResources();
Assert.assertEquals(totalResource, scheduler.getTotalResources());
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
@@ -1890,12 +1397,6 @@ public class TestTaskScheduler {
Assert.assertEquals(totalResource, scheduler.getTotalResources());
verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
- final List<ArrayList<CookieContainerRequest>> anyList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- final List<ArrayList<CookieContainerRequest>> emptyList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
-
- anyList.add(anyContainers);
List<Container> containers = new ArrayList<Container>();
Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
@@ -1904,44 +1405,8 @@ public class TestTaskScheduler {
ContainerId mockCId1 = mock(ContainerId.class);
when(mockContainer1.getId()).thenReturn(mockCId1);
containers.add(mockContainer1);
- when(
- mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
- });
- // RackResolver by default puts hosts in default-rack
- when(
- mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return emptyList;
- }
- });
- when(
- mockRMClient.getMatchingRequests((Priority) any(),
- eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- int calls = 0;
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- if(calls > 0) {
- anyContainers.remove(0);
- }
- calls++;
- return anyList;
- }
- });
- Mockito.doAnswer(new Answer() {
+ Mockito.doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
ContainerId cId = (ContainerId) args[0];
@@ -2031,13 +1496,10 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
}
- @SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testLocalityMatching() throws Exception {
-
- RackResolver.init(new Configuration());
- TezAMRMClientAsync<CookieContainerRequest> amrmClient =
- mock(TezAMRMClientAsync.class);
+ TezAMRMClientAsync<CookieContainerRequest> amrmClient = spy(
+ new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
Configuration conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
@@ -2049,16 +1511,6 @@ public class TestTaskScheduler {
new TaskSchedulerWithDrainableContext(drainableAppCallback, amrmClient);
taskScheduler.initialize();
-
- RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
- Resource mockMaxResource = mock(Resource.class);
- Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
- when(mockRegResponse.getMaximumResourceCapability()).thenReturn(
- mockMaxResource);
- when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
- when(amrmClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
- .thenReturn(mockRegResponse);
-
taskScheduler.start();
Resource resource = Resource.newInstance(1024, 1);
@@ -2087,7 +1539,6 @@ public class TestTaskScheduler {
allocatedContainers.add(containerHost3);
allocatedContainers.add(containerHost1);
- final Map<String, List<CookieContainerRequest>> matchingMap = new HashMap<String, List<CookieContainerRequest>>();
taskScheduler.allocateTask(mockTask1, resource, hostsTask1, defaultRack,
priority, null, mockCookie1);
drainableAppCallback.drain();
@@ -2096,8 +1547,6 @@ public class TestTaskScheduler {
host1List.add(mockCookie1);
List<CookieContainerRequest> defaultRackList = new ArrayList<CookieContainerRequest>();
defaultRackList.add(mockCookie1);
- matchingMap.put(hostsTask1[0], host1List);
- matchingMap.put(defaultRack[0], defaultRackList);
List<CookieContainerRequest> nonAllocatedHostList = new ArrayList<YarnTaskSchedulerService.CookieContainerRequest>();
nonAllocatedHostList.add(mockCookie2);
@@ -2106,48 +1555,11 @@ public class TestTaskScheduler {
taskScheduler.allocateTask(mockTask2, resource, hostsTask2, otherRack,
priority, null, mockCookie2);
drainableAppCallback.drain();
- matchingMap.put(hostsTask2[0], nonAllocatedHostList);
- matchingMap.put(otherRack[0], otherRackList);
List<CookieContainerRequest> anyList = new LinkedList<YarnTaskSchedulerService.CookieContainerRequest>();
anyList.add(mockCookie1);
anyList.add(mockCookie2);
- matchingMap.put(ResourceRequest.ANY, anyList);
-
- final List<ArrayList<CookieContainerRequest>> emptyList = new LinkedList<ArrayList<CookieContainerRequest>>();
-
- when(
- amrmClient.getMatchingRequests((Priority) any(), anyString(),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- String location = (String) invocation.getArguments()[1];
- if (matchingMap.get(location) != null) {
- CookieContainerRequest matched = matchingMap.get(location).get(0);
- // Remove matched from matchingMap - assuming TaskScheduler will
- // pick the first entry.
- Iterator<Entry<String, List<CookieContainerRequest>>> iter = matchingMap
- .entrySet().iterator();
- while (iter.hasNext()) {
- Entry<String, List<CookieContainerRequest>> entry = iter.next();
- if (entry.getValue().remove(matched)) {
- if (entry.getValue().size() == 0) {
- iter.remove();
- }
- }
- }
- return Collections.singletonList(Collections
- .singletonList(matched));
- } else {
- return emptyList;
- }
- }
- });
-
taskScheduler.onContainersAllocated(allocatedContainers);
drainableAppCallback.drain();
@@ -2193,13 +1605,10 @@ public class TestTaskScheduler {
Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 1));
}
- @SuppressWarnings("unchecked")
@Test
public void testContainerExpired() throws Exception {
- RackResolver.init(new YarnConfiguration());
-
- TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
- mock(TezAMRMClientAsync.class);
+ TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+ new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
String appHost = "host";
int appPort = 0;
@@ -2218,22 +1627,6 @@ public class TestTaskScheduler {
new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
scheduler.initialize();
- drainableAppCallback.drain();
-
- RegisterApplicationMasterResponse mockRegResponse =
- mock(RegisterApplicationMasterResponse.class);
- Resource mockMaxResource = mock(Resource.class);
- Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
- when(mockRegResponse.getMaximumResourceCapability()).
- thenReturn(mockMaxResource);
- when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
- when(mockRMClient.
- registerApplicationMaster(anyString(), anyInt(), anyString())).
- thenReturn(mockRegResponse);
- Resource mockClusterResource = mock(Resource.class);
- when(mockRMClient.getAvailableResources()).
- thenReturn(mockClusterResource);
-
scheduler.start();
drainableAppCallback.drain();
@@ -2263,7 +1656,6 @@ public class TestTaskScheduler {
drainableAppCallback.drain();
verify(mockRMClient, times(2)).
addContainerRequest(requestCaptor.capture());
- CookieContainerRequest request1 = requestCaptor.getValue();
List<Container> containers = new ArrayList<Container>();
// sending only lower priority container to make sure its not matched
@@ -2275,25 +1667,6 @@ public class TestTaskScheduler {
when(mockContainer2.getId()).thenReturn(mockCId2);
when(mockCId2.toString()).thenReturn("container2");
containers.add(mockContainer2);
- ArrayList<CookieContainerRequest> hostContainers =
- new ArrayList<CookieContainerRequest>();
- hostContainers.add(request1);
- final List<ArrayList<CookieContainerRequest>> hostList =
- new LinkedList<ArrayList<CookieContainerRequest>>();
- hostList.add(hostContainers);
-
- when(
- mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
- (Resource) any())).thenAnswer(
- new Answer<List<? extends Collection<CookieContainerRequest>>>() {
- @Override
- public List<? extends Collection<CookieContainerRequest>> answer(
- InvocationOnMock invocation) throws Throwable {
- return hostList;
- }
-
- });
- when(mockRMClient.getTopPriority()).thenReturn(mockPriority1);
scheduler.onContainersAllocated(containers);
http://git-wip-us.apache.org/repos/asf/tez/blob/501a351d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index d8170e3..9a845a1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -79,6 +79,11 @@ class TestTaskSchedulerHelpers {
// Mocking AMRMClientImpl to make use of getMatchingRequest
static class AMRMClientForTest extends AMRMClientImpl<CookieContainerRequest> {
+ AMRMClientForTest() {
+ super();
+ this.clusterAvailableResources = Resource.newInstance(4000, 4);
+ this.clusterNodeCount = 5;
+ }
@Override
protected void serviceStart() {
@@ -93,6 +98,7 @@ class TestTaskSchedulerHelpers {
// Mocking AMRMClientAsyncImpl to make use of getMatchingRequest
static class AMRMClientAsyncForTest extends
TezAMRMClientAsync<CookieContainerRequest> {
+ private RegisterApplicationMasterResponse mockRegResponse;
public AMRMClientAsyncForTest(
AMRMClient<CookieContainerRequest> client,
@@ -105,7 +111,7 @@ class TestTaskSchedulerHelpers {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl) {
- RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
+ mockRegResponse = mock(RegisterApplicationMasterResponse.class);
Resource mockMaxResource = mock(Resource.class);
Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
when(mockRegResponse.getMaximumResourceCapability()).thenReturn(
@@ -126,6 +132,10 @@ class TestTaskSchedulerHelpers {
@Override
protected void serviceStop() {
}
+
+ RegisterApplicationMasterResponse getRegistrationResponse() {
+ return mockRegResponse;
+ }
}
// Overrides start / stop. Will be controlled without the extra event handling thread.