You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/11/18 21:55:18 UTC

tez git commit: TEZ-3508. TestTaskScheduler cleanup. (jlowe)

Repository: tez
Updated Branches:
  refs/heads/master b71ea4aab -> 501a351d5


TEZ-3508. TestTaskScheduler cleanup. (jlowe)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/501a351d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/501a351d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/501a351d

Branch: refs/heads/master
Commit: 501a351d59d6bafbd3d4605785492d81a574574b
Parents: b71ea4a
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Nov 18 21:53:39 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Nov 18 21:53:39 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/dag/app/rm/TestTaskScheduler.java       | 689 +------------------
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |  12 +-
 3 files changed, 43 insertions(+), 659 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/501a351d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cac97f9..f13511d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3508. TestTaskScheduler cleanup.
   TEZ-3536. NPE in WebUIService start when host resolution fails.
   TEZ-3269. Provide basic fair routing and scheduling functionality via custom VertexManager and EdgeManager.
   TEZ-3534. Differentiate thread names on Fetchers, minor changes to shuffle shutdown code.

http://git-wip-us.apache.org/repos/asf/tez/blob/501a351d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index 5c8daeb..b3511e8 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -29,32 +29,23 @@ import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.commons.io.IOExceptionWithCause;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -66,17 +57,17 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tez.common.ContainerSignatureMatcher;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
-import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
-import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientAsyncForTest;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AMRMClientForTest;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.AlwaysMatchesContainerMatcher;
 import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.PreemptionMatcher;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerContextDrainable;
+import org.apache.tez.dag.app.rm.TestTaskSchedulerHelpers.TaskSchedulerWithDrainableContext;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.CookieContainerRequest;
+import org.apache.tez.dag.app.rm.YarnTaskSchedulerService.HeldContainer;
 import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
 import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
@@ -91,6 +82,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 @SuppressWarnings("deprecation")
 public class TestTaskScheduler {
@@ -127,10 +119,8 @@ public class TestTaskScheduler {
   @SuppressWarnings({ "unchecked" })
   @Test(timeout=10000)
   public void testTaskSchedulerNoReuse() throws Exception {
-    RackResolver.init(new YarnConfiguration());
-
-    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
-                                                  mock(TezAMRMClientAsync.class);
+    AMRMClientAsyncForTest mockRMClient = spy(
+        new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
     String appHost = "host";
     int appPort = 0;
@@ -153,33 +143,16 @@ public class TestTaskScheduler {
     // it's the same instance.
     verify(mockRMClient).setHeartbeatInterval(interval);
 
-    RegisterApplicationMasterResponse mockRegResponse =
-                                mock(RegisterApplicationMasterResponse.class);
-    Resource mockMaxResource = mock(Resource.class);
-    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
-    when(mockRegResponse.getMaximumResourceCapability()).
-                                                   thenReturn(mockMaxResource);
-    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
-    ByteBuffer mockKey = mock(ByteBuffer.class);
-    when(mockRegResponse.getClientToAMTokenMasterKey()).thenReturn(mockKey);
-    when(mockRMClient.
-          registerApplicationMaster(anyString(), anyInt(), anyString())).
-                                                   thenReturn(mockRegResponse);
     scheduler.start();
     drainableAppCallback.drain();
     verify(mockRMClient).start();
     verify(mockRMClient).registerApplicationMaster(appHost, appPort, appUrl);
-    verify(mockApp).setApplicationRegistrationData(mockMaxResource,
-                                                   mockAcls, mockKey);
-
-    when(mockRMClient.getClusterNodeCount()).thenReturn(5);
-    Assert.assertEquals(5, scheduler.getClusterNodeCount());
+    RegisterApplicationMasterResponse regResponse = mockRMClient.getRegistrationResponse();
+    verify(mockApp).setApplicationRegistrationData(regResponse.getMaximumResourceCapability(),
+                                                   regResponse.getApplicationACLs(),
+                                                   regResponse.getClientToAMTokenMasterKey());
 
-    Resource mockClusterResource = mock(Resource.class);
-    when(mockRMClient.getAvailableResources()).
-                                              thenReturn(mockClusterResource);
-    Assert.assertEquals(mockClusterResource,
-                        mockRMClient.getAvailableResources());
+    Assert.assertEquals(scheduler.getClusterNodeCount(), mockRMClient.getClusterNodeCount());
 
     Object mockTask1 = mock(Object.class);
     Object mockCookie1 = mock(Object.class);
@@ -257,84 +230,6 @@ public class TestTaskScheduler {
     ContainerId mockCId4 = mock(ContainerId.class);
     when(mockContainer4.getId()).thenReturn(mockCId4);
     containers.add(mockContainer4);
-    ArrayList<CookieContainerRequest> hostContainers =
-                             new ArrayList<CookieContainerRequest>();
-    hostContainers.add(request1);
-    hostContainers.add(request2);
-    hostContainers.add(request3);
-    ArrayList<CookieContainerRequest> rackContainers =
-                             new ArrayList<CookieContainerRequest>();
-    rackContainers.add(request2);
-    rackContainers.add(request3);
-    ArrayList<CookieContainerRequest> anyContainers =
-                             new ArrayList<CookieContainerRequest>();
-    anyContainers.add(request3);
-
-    final List<ArrayList<CookieContainerRequest>> hostList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    hostList.add(hostContainers);
-    final List<ArrayList<CookieContainerRequest>> rackList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    rackList.add(rackContainers);
-    final List<ArrayList<CookieContainerRequest>> anyList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    anyList.add(anyContainers);
-    final List<ArrayList<CookieContainerRequest>> emptyList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    // return all requests for host1
-    when(
-        mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return hostList;
-          }
-
-        });
-    // first request matched by host
-    // second request matched to rack. RackResolver by default puts hosts in
-    // /default-rack. We need to workaround by returning rack matches only once
-    when(
-        mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return rackList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
-    // third request matched to ANY
-    when(
-        mockRMClient.getMatchingRequests((Priority) any(),
-            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return anyList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
     scheduler.onContainersAllocated(containers);
     drainableAppCallback.drain();
     // first container allocated
@@ -398,9 +293,6 @@ public class TestTaskScheduler {
         null, mockPriority, null, mockCookie4);
     drainableAppCallback.drain();
     verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture());
-    CookieContainerRequest request4 = requestCaptor.getValue();
-    anyContainers.clear();
-    anyContainers.add(request4);
     Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
     when(mockContainer5.getNodeId()).thenReturn(badNodeId);
@@ -408,25 +300,6 @@ public class TestTaskScheduler {
     when(mockContainer5.getId()).thenReturn(mockCId5);
     containers.clear();
     containers.add(mockContainer5);
-    when(
-        mockRMClient.getMatchingRequests((Priority) any(),
-            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return anyList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
     scheduler.onContainersAllocated(containers);
     drainableAppCallback.drain();
     // no new allocation
@@ -436,34 +309,12 @@ public class TestTaskScheduler {
     verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any());
     // verify request added back
     verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
-    CookieContainerRequest request5 = requestCaptor.getValue();
-    anyContainers.clear();
-    anyContainers.add(request5);
     Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
     ContainerId mockCId6 = mock(ContainerId.class);
     when(mockContainer6.getId()).thenReturn(mockCId6);
     containers.clear();
     containers.add(mockContainer6);
-    when(
-        mockRMClient.getMatchingRequests((Priority) any(),
-            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return anyList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
     scheduler.onContainersAllocated(containers);
     drainableAppCallback.drain();
     // new allocation
@@ -532,11 +383,8 @@ public class TestTaskScheduler {
     verify(mockRMClient).stop();
   }
 
-  @SuppressWarnings({ "unchecked" })
   @Test(timeout=10000)
   public void testTaskSchedulerInitiateStop() throws Exception {
-    RackResolver.init(new YarnConfiguration());
-
     String appHost = "host";
     int appPort = 0;
     String appUrl = "url";
@@ -550,8 +398,8 @@ public class TestTaskScheduler {
     TaskSchedulerContext mockApp = setupMockTaskSchedulerContext(appHost, appPort, appUrl, conf);
     final TaskSchedulerContextDrainable drainableAppCallback = createDrainableContext(mockApp);
 
-    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
-                                                  mock(TezAMRMClientAsync.class);
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+        new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
     TaskSchedulerWithDrainableContext scheduler =
         new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
@@ -559,20 +407,6 @@ public class TestTaskScheduler {
     scheduler.initialize();
     drainableAppCallback.drain();
 
-    RegisterApplicationMasterResponse mockRegResponse =
-                                mock(RegisterApplicationMasterResponse.class);
-    Resource mockMaxResource = mock(Resource.class);
-    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
-    when(mockRegResponse.getMaximumResourceCapability()).
-                                                   thenReturn(mockMaxResource);
-    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
-    when(mockRMClient.
-          registerApplicationMaster(anyString(), anyInt(), anyString())).
-                                                   thenReturn(mockRegResponse);
-    Resource mockClusterResource = mock(Resource.class);
-    when(mockRMClient.getAvailableResources()).
-                                              thenReturn(mockClusterResource);
-
     scheduler.start();
     drainableAppCallback.drain();
 
@@ -585,8 +419,6 @@ public class TestTaskScheduler {
     final Priority mockPriority1 = Priority.newInstance(1);
     final Priority mockPriority2 = Priority.newInstance(2);
     final Priority mockPriority3 = Priority.newInstance(3);
-    final Priority mockPriority4 = Priority.newInstance(4);
-    final Priority mockPriority5 = Priority.newInstance(5);
     Object mockTask2 = mock(Object.class);
     when(mockTask2.toString()).thenReturn("task2");
     Object mockCookie2 = mock(Object.class);
@@ -644,93 +476,6 @@ public class TestTaskScheduler {
                              new ArrayList<CookieContainerRequest>();
     anyContainers.add(request3);
 
-    final List<ArrayList<CookieContainerRequest>> hostList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    hostList.add(hostContainers);
-    final List<ArrayList<CookieContainerRequest>> rackList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    rackList.add(rackContainers);
-    final List<ArrayList<CookieContainerRequest>> anyList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    anyList.add(anyContainers);
-    final List<ArrayList<CookieContainerRequest>> emptyList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    // return pri1 requests for host1
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return hostList;
-          }
-
-        });
-    // second request matched to rack. RackResolver by default puts hosts in
-    // /default-rack. We need to workaround by returning rack matches only once
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return rackList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
-    // third request matched to ANY
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(
-            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return anyList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
-
-    when(mockRMClient.getTopPriority()).then(
-        new Answer<Priority>() {
-          @Override
-          public Priority answer(
-              InvocationOnMock invocation) throws Throwable {
-            int allocations = drainableAppCallback.count.get();
-            if (allocations == 0) {
-              return mockPriority1;
-            }
-            if (allocations == 1) {
-              return mockPriority2;
-            }
-            if (allocations == 2) {
-              return mockPriority3;
-            }
-            if (allocations == 3) {
-              return mockPriority4;
-            }
-            return null;
-          }
-        });
-
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
     scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
 
@@ -757,10 +502,8 @@ public class TestTaskScheduler {
   @SuppressWarnings({ "unchecked" })
   @Test(timeout=10000)
   public void testTaskSchedulerWithReuse() throws Exception {
-    RackResolver.init(new YarnConfiguration());
-
-    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
-                                                  mock(TezAMRMClientAsync.class);
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+        new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
     String appHost = "host";
     int appPort = 0;
@@ -782,21 +525,6 @@ public class TestTaskScheduler {
 
     scheduler.initialize();
     drainableAppCallback.drain();
-
-    RegisterApplicationMasterResponse mockRegResponse =
-                                mock(RegisterApplicationMasterResponse.class);
-    Resource mockMaxResource = mock(Resource.class);
-    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
-    when(mockRegResponse.getMaximumResourceCapability()).
-                                                   thenReturn(mockMaxResource);
-    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
-    when(mockRMClient.
-          registerApplicationMaster(anyString(), anyInt(), anyString())).
-                                                   thenReturn(mockRegResponse);
-    Resource mockClusterResource = mock(Resource.class);
-    when(mockRMClient.getAvailableResources()).
-                                              thenReturn(mockClusterResource);
-
     scheduler.start();
     drainableAppCallback.drain();
 
@@ -874,103 +602,6 @@ public class TestTaskScheduler {
     when(mockCId3.toString()).thenReturn("container3");
     containers.add(mockContainer3);
 
-    ArrayList<CookieContainerRequest> hostContainers =
-                             new ArrayList<CookieContainerRequest>();
-    hostContainers.add(request1);
-    ArrayList<CookieContainerRequest> rackContainers =
-                             new ArrayList<CookieContainerRequest>();
-    rackContainers.add(request2);
-    ArrayList<CookieContainerRequest> anyContainers =
-                             new ArrayList<CookieContainerRequest>();
-    anyContainers.add(request3);
-
-    final List<ArrayList<CookieContainerRequest>> hostList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    hostList.add(hostContainers);
-    final List<ArrayList<CookieContainerRequest>> rackList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    rackList.add(rackContainers);
-    final List<ArrayList<CookieContainerRequest>> anyList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    anyList.add(anyContainers);
-    final List<ArrayList<CookieContainerRequest>> emptyList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    // return pri1 requests for host1
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return hostList;
-          }
-
-        });
-    // second request matched to rack. RackResolver by default puts hosts in
-    // /default-rack. We need to workaround by returning rack matches only once
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(eq("/default-rack"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return rackList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
-    // third request matched to ANY
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(
-            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return anyList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
-    
-    when(mockRMClient.getTopPriority()).then(        
-        new Answer<Priority>() {
-          @Override
-          public Priority answer(
-              InvocationOnMock invocation) throws Throwable {
-            int allocations = drainableAppCallback.count.get();
-            if (allocations == 0) {
-              return mockPriority1;
-            }
-            if (allocations == 1) {
-              return mockPriority2;
-            }
-            if (allocations == 2) {
-              return mockPriority3;
-            }
-            if (allocations == 3) {
-              return mockPriority4;
-            }
-            return null;
-          }
-        });
-    
     AtomicBoolean drainNotifier = new AtomicBoolean(false);
     scheduler.delayedContainerManager.drainedDelayedContainersForTest = drainNotifier;
     
@@ -1039,9 +670,6 @@ public class TestTaskScheduler {
         null, mockPriority4, null, mockCookie4);
     drainableAppCallback.drain();
     verify(mockRMClient, times(4)).addContainerRequest(requestCaptor.capture());
-    CookieContainerRequest request4 = requestCaptor.getValue();
-    anyContainers.clear();
-    anyContainers.add(request4);
     Container mockContainer5 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer5.getNodeId().getHost()).thenReturn(badHost);
     when(mockContainer5.getNodeId()).thenReturn(badNodeId);
@@ -1052,25 +680,6 @@ public class TestTaskScheduler {
     when(mockContainer5.getPriority()).thenReturn(mockPriority4);
     containers.clear();
     containers.add(mockContainer5);
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(
-            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return anyList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
     drainNotifier.set(false);
     scheduler.onContainersAllocated(containers);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
@@ -1082,9 +691,6 @@ public class TestTaskScheduler {
     verify(mockRMClient, times(4)).releaseAssignedContainer((ContainerId) any());
     // verify request added back
     verify(mockRMClient, times(5)).addContainerRequest(requestCaptor.capture());
-    CookieContainerRequest request5 = requestCaptor.getValue();
-    anyContainers.clear();
-    anyContainers.add(request5);
     Container mockContainer6 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer6.getNodeId().getHost()).thenReturn("host7");
     ContainerId mockCId6 = mock(ContainerId.class);
@@ -1093,25 +699,6 @@ public class TestTaskScheduler {
     when(mockCId6.toString()).thenReturn("container6");
     containers.clear();
     containers.add(mockContainer6);
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(
-            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return anyList;
-          }
-
-        }).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-
-        });
     drainNotifier.set(false);
     scheduler.onContainersAllocated(containers);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
@@ -1131,21 +718,6 @@ public class TestTaskScheduler {
     assertEquals(0, scheduler.blacklistedNodes.size());
     
     // verify container level matching
-    // fudge the top level priority to prevent containers from being released
-    // if top level priority is higher than newly allocated containers then 
-    // they will not be released
-    final AtomicBoolean fudgePriority = new AtomicBoolean(true);
-    when(mockRMClient.getTopPriority()).then(        
-        new Answer<Priority>() {
-          @Override
-          public Priority answer(
-              InvocationOnMock invocation) throws Throwable {
-            if (fudgePriority.get()) {
-              return mockPriority4;
-            }
-            return mockPriority5;
-          }
-        });
     // add a dummy task to prevent release of allocated containers
     Object mockTask5 = mock(Object.class);
     when(mockTask5.toString()).thenReturn("task5");
@@ -1153,7 +725,6 @@ public class TestTaskScheduler {
     scheduler.allocateTask(mockTask5, mockCapability, hosts,
         racks, mockPriority5, null, mockCookie5);
     verify(mockRMClient, times(6)).addContainerRequest(requestCaptor.capture());
-    CookieContainerRequest request6 = requestCaptor.getValue();
     drainableAppCallback.drain();
     // add containers so that we can reference one of them for container specific
     // allocation
@@ -1185,24 +756,6 @@ public class TestTaskScheduler {
     scheduler.allocateTask(mockTask6, mockCapability, mockCId7, mockPriority5, null, mockCookie6);
     drainableAppCallback.drain();
     verify(mockRMClient, times(7)).addContainerRequest(requestCaptor.capture());
-    CookieContainerRequest request7 = requestCaptor.getValue();
-    hostContainers.clear();
-    hostContainers.add(request6);
-    hostContainers.add(request7);
-    
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(eq("host5"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return hostList;
-          }
-
-        });
-    // stop fudging top priority
-    fudgePriority.set(false);
     TestTaskSchedulerHelpers.waitForDelayedDrainNotify(drainNotifier);
     drainableAppCallback.drain();
     verify(mockApp, times(6)).taskAllocated(any(), any(), (Container) any());
@@ -1253,13 +806,10 @@ public class TestTaskScheduler {
     verify(mockRMClient).stop();
   }
   
-  @SuppressWarnings("unchecked")
   @Test (timeout=5000)
   public void testTaskSchedulerDetermineMinHeldContainers() throws Exception {
-    RackResolver.init(new YarnConfiguration());
-
-    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
-                                                  mock(TezAMRMClientAsync.class);
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+        new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
     String appHost = "host";
     int appPort = 0;
@@ -1273,18 +823,6 @@ public class TestTaskScheduler {
         new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
 
     scheduler.initialize();
-    RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
-    Resource mockMaxResource = mock(Resource.class);
-    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
-    when(mockRegResponse.getMaximumResourceCapability()).thenReturn(
-        mockMaxResource);
-    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
-    when(
-        mockRMClient.registerApplicationMaster(anyString(), anyInt(),
-            anyString())).thenReturn(mockRegResponse);
-    Resource mockClusterResource = mock(Resource.class);
-    when(mockRMClient.getAvailableResources()).thenReturn(mockClusterResource);
-
     scheduler.start();
     
     String rack1 = "r1";
@@ -1427,13 +965,10 @@ public class TestTaskScheduler {
     scheduler.shutdown();
   }
   
-  @SuppressWarnings("unchecked")
   @Test(timeout=5000)
   public void testTaskSchedulerRandomReuseExpireTime() throws Exception {
-    RackResolver.init(new YarnConfiguration());
-
-    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
-                                                  mock(TezAMRMClientAsync.class);
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+        new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
     String appHost = "host";
     int appPort = 0;
@@ -1463,21 +998,6 @@ public class TestTaskScheduler {
     scheduler1.initialize();
     scheduler2.initialize();
 
-
-    RegisterApplicationMasterResponse mockRegResponse =
-                                mock(RegisterApplicationMasterResponse.class);
-    Resource mockMaxResource = mock(Resource.class);
-    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
-    when(mockRegResponse.getMaximumResourceCapability()).
-                                                   thenReturn(mockMaxResource);
-    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
-    when(mockRMClient.
-          registerApplicationMaster(anyString(), anyInt(), anyString())).
-                                                   thenReturn(mockRegResponse);
-    Resource mockClusterResource = mock(Resource.class);
-    when(mockRMClient.getAvailableResources()).
-                                              thenReturn(mockClusterResource);
-
     scheduler1.start();
     scheduler2.start();
     
@@ -1509,8 +1029,6 @@ public class TestTaskScheduler {
   @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test (timeout=5000)
   public void testTaskSchedulerPreemption() throws Exception {
-    RackResolver.init(new YarnConfiguration());
-
     TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
                                                   mock(TezAMRMClientAsync.class);
 
@@ -1818,13 +1336,10 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
   }
   
-  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test (timeout=5000)
   public void testTaskSchedulerPreemption2() throws Exception {
-    RackResolver.init(new YarnConfiguration());
-
-    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
-                                                  mock(TezAMRMClientAsync.class);
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+        new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
     String appHost = "host";
     int appPort = 0;
@@ -1845,20 +1360,12 @@ public class TestTaskScheduler {
         new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
 
     scheduler.initialize();
-
-    RegisterApplicationMasterResponse mockRegResponse =
-                       mock(RegisterApplicationMasterResponse.class);
-    when(
-        mockRMClient.registerApplicationMaster(anyString(), anyInt(),
-            anyString())).thenReturn(mockRegResponse);
-
     scheduler.start();
-    Resource totalResource = Resource.newInstance(4000, 4); // high value always reported
-    when(mockRMClient.getAvailableResources()).thenReturn(totalResource);
 
     // no preemption
     scheduler.getProgress();
     drainableAppCallback.drain();
+    Resource totalResource = mockRMClient.getAvailableResources();
     Assert.assertEquals(totalResource, scheduler.getTotalResources());
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
 
@@ -1890,12 +1397,6 @@ public class TestTaskScheduler {
     Assert.assertEquals(totalResource, scheduler.getTotalResources());
     verify(mockRMClient, times(0)).releaseAssignedContainer((ContainerId)any());
 
-    final List<ArrayList<CookieContainerRequest>> anyList =
-        new LinkedList<ArrayList<CookieContainerRequest>>();
-    final List<ArrayList<CookieContainerRequest>> emptyList =
-        new LinkedList<ArrayList<CookieContainerRequest>>();
-
-    anyList.add(anyContainers);
     List<Container> containers = new ArrayList<Container>();
     Container mockContainer1 = mock(Container.class, RETURNS_DEEP_STUBS);
     when(mockContainer1.getNodeId().getHost()).thenReturn("host1");
@@ -1904,44 +1405,8 @@ public class TestTaskScheduler {
     ContainerId mockCId1 = mock(ContainerId.class);
     when(mockContainer1.getId()).thenReturn(mockCId1);
     containers.add(mockContainer1);
-    when(
-        mockRMClient.getMatchingRequests((Priority) any(), eq("host1"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-        });
-    // RackResolver by default puts hosts in default-rack
-    when(
-        mockRMClient.getMatchingRequests((Priority) any(), eq("/default-rack"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return emptyList;
-          }
-        });
-    when(
-        mockRMClient.getMatchingRequests((Priority) any(),
-            eq(ResourceRequest.ANY), (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          int calls = 0;
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            if(calls > 0) {
-              anyContainers.remove(0);
-            }
-            calls++;
-            return anyList;
-          }
-        });
     
-    Mockito.doAnswer(new Answer() {
+    Mockito.doAnswer(new Answer<Object>() {
       public Object answer(InvocationOnMock invocation) {
           Object[] args = invocation.getArguments();
           ContainerId cId = (ContainerId) args[0];
@@ -2031,13 +1496,10 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
   }
 
-  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testLocalityMatching() throws Exception {
-
-    RackResolver.init(new Configuration());
-    TezAMRMClientAsync<CookieContainerRequest> amrmClient =
-      mock(TezAMRMClientAsync.class);
+    TezAMRMClientAsync<CookieContainerRequest> amrmClient = spy(
+        new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
     Configuration conf = new Configuration();
     conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
@@ -2049,16 +1511,6 @@ public class TestTaskScheduler {
         new TaskSchedulerWithDrainableContext(drainableAppCallback, amrmClient);
 
     taskScheduler.initialize();
-    
-    RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
-    Resource mockMaxResource = mock(Resource.class);
-    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
-    when(mockRegResponse.getMaximumResourceCapability()).thenReturn(
-        mockMaxResource);
-    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
-    when(amrmClient.registerApplicationMaster(anyString(), anyInt(), anyString()))
-        .thenReturn(mockRegResponse);
-    
     taskScheduler.start();
     
     Resource resource = Resource.newInstance(1024, 1);
@@ -2087,7 +1539,6 @@ public class TestTaskScheduler {
     allocatedContainers.add(containerHost3);
     allocatedContainers.add(containerHost1);
 
-    final Map<String, List<CookieContainerRequest>> matchingMap = new HashMap<String, List<CookieContainerRequest>>();
     taskScheduler.allocateTask(mockTask1, resource, hostsTask1, defaultRack,
         priority, null, mockCookie1);
     drainableAppCallback.drain();
@@ -2096,8 +1547,6 @@ public class TestTaskScheduler {
     host1List.add(mockCookie1);
     List<CookieContainerRequest> defaultRackList = new ArrayList<CookieContainerRequest>();
     defaultRackList.add(mockCookie1);
-    matchingMap.put(hostsTask1[0], host1List);
-    matchingMap.put(defaultRack[0], defaultRackList);
 
     List<CookieContainerRequest> nonAllocatedHostList = new ArrayList<YarnTaskSchedulerService.CookieContainerRequest>();
     nonAllocatedHostList.add(mockCookie2);
@@ -2106,48 +1555,11 @@ public class TestTaskScheduler {
     taskScheduler.allocateTask(mockTask2, resource, hostsTask2, otherRack,
         priority, null, mockCookie2);
     drainableAppCallback.drain();
-    matchingMap.put(hostsTask2[0], nonAllocatedHostList);
-    matchingMap.put(otherRack[0], otherRackList);
 
     List<CookieContainerRequest> anyList = new LinkedList<YarnTaskSchedulerService.CookieContainerRequest>();
     anyList.add(mockCookie1);
     anyList.add(mockCookie2);
 
-    matchingMap.put(ResourceRequest.ANY, anyList);
-
-    final List<ArrayList<CookieContainerRequest>> emptyList = new LinkedList<ArrayList<CookieContainerRequest>>();
-
-    when(
-        amrmClient.getMatchingRequests((Priority) any(), anyString(),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            String location = (String) invocation.getArguments()[1];
-            if (matchingMap.get(location) != null) {
-              CookieContainerRequest matched = matchingMap.get(location).get(0);
-              // Remove matched from matchingMap - assuming TaskScheduler will
-              // pick the first entry.
-              Iterator<Entry<String, List<CookieContainerRequest>>> iter = matchingMap
-                  .entrySet().iterator();
-              while (iter.hasNext()) {
-                Entry<String, List<CookieContainerRequest>> entry = iter.next();
-                if (entry.getValue().remove(matched)) {
-                  if (entry.getValue().size() == 0) {
-                    iter.remove();
-                  }
-                }
-              }
-              return Collections.singletonList(Collections
-                  .singletonList(matched));
-            } else {
-              return emptyList;
-            }
-          }
-        });
-
     taskScheduler.onContainersAllocated(allocatedContainers);
     drainableAppCallback.drain();
 
@@ -2193,13 +1605,10 @@ public class TestTaskScheduler {
     Assert.assertEquals(1, YarnTaskSchedulerService.scaleDownByPreemptionPercentage(1, 1));
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testContainerExpired() throws Exception {
-    RackResolver.init(new YarnConfiguration());
-
-    TezAMRMClientAsync<CookieContainerRequest> mockRMClient =
-                                                  mock(TezAMRMClientAsync.class);
+    TezAMRMClientAsync<CookieContainerRequest> mockRMClient = spy(
+        new AMRMClientAsyncForTest(new AMRMClientForTest(), 100));
 
     String appHost = "host";
     int appPort = 0;
@@ -2218,22 +1627,6 @@ public class TestTaskScheduler {
         new TaskSchedulerWithDrainableContext(drainableAppCallback, mockRMClient);
 
     scheduler.initialize();
-    drainableAppCallback.drain();
-
-    RegisterApplicationMasterResponse mockRegResponse =
-                                mock(RegisterApplicationMasterResponse.class);
-    Resource mockMaxResource = mock(Resource.class);
-    Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
-    when(mockRegResponse.getMaximumResourceCapability()).
-                                                   thenReturn(mockMaxResource);
-    when(mockRegResponse.getApplicationACLs()).thenReturn(mockAcls);
-    when(mockRMClient.
-          registerApplicationMaster(anyString(), anyInt(), anyString())).
-                                                   thenReturn(mockRegResponse);
-    Resource mockClusterResource = mock(Resource.class);
-    when(mockRMClient.getAvailableResources()).
-                                              thenReturn(mockClusterResource);
-
     scheduler.start();
     drainableAppCallback.drain();
 
@@ -2263,7 +1656,6 @@ public class TestTaskScheduler {
     drainableAppCallback.drain();
     verify(mockRMClient, times(2)).
                                 addContainerRequest(requestCaptor.capture());
-    CookieContainerRequest request1 = requestCaptor.getValue();
 
     List<Container> containers = new ArrayList<Container>();
     // sending only lower priority container to make sure its not matched
@@ -2275,25 +1667,6 @@ public class TestTaskScheduler {
     when(mockContainer2.getId()).thenReturn(mockCId2);
     when(mockCId2.toString()).thenReturn("container2");
     containers.add(mockContainer2);
-    ArrayList<CookieContainerRequest> hostContainers =
-                             new ArrayList<CookieContainerRequest>();
-    hostContainers.add(request1);
-    final List<ArrayList<CookieContainerRequest>> hostList =
-                        new LinkedList<ArrayList<CookieContainerRequest>>();
-    hostList.add(hostContainers);
-
-    when(
-        mockRMClient.getMatchingRequestsForTopPriority(eq("host1"),
-            (Resource) any())).thenAnswer(
-        new Answer<List<? extends Collection<CookieContainerRequest>>>() {
-          @Override
-          public List<? extends Collection<CookieContainerRequest>> answer(
-              InvocationOnMock invocation) throws Throwable {
-            return hostList;
-          }
-
-        });
-    when(mockRMClient.getTopPriority()).thenReturn(mockPriority1);
 
     scheduler.onContainersAllocated(containers);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/501a351d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index d8170e3..9a845a1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -79,6 +79,11 @@ class TestTaskSchedulerHelpers {
 
   // Mocking AMRMClientImpl to make use of getMatchingRequest
   static class AMRMClientForTest extends AMRMClientImpl<CookieContainerRequest> {
+    AMRMClientForTest() {
+      super();
+      this.clusterAvailableResources = Resource.newInstance(4000, 4);
+      this.clusterNodeCount = 5;
+    }
 
     @Override
     protected void serviceStart() {
@@ -93,6 +98,7 @@ class TestTaskSchedulerHelpers {
   // Mocking AMRMClientAsyncImpl to make use of getMatchingRequest
   static class AMRMClientAsyncForTest extends
       TezAMRMClientAsync<CookieContainerRequest> {
+    private RegisterApplicationMasterResponse mockRegResponse;
 
     public AMRMClientAsyncForTest(
         AMRMClient<CookieContainerRequest> client,
@@ -105,7 +111,7 @@ class TestTaskSchedulerHelpers {
     @Override
     public RegisterApplicationMasterResponse registerApplicationMaster(
         String appHostName, int appHostPort, String appTrackingUrl) {
-      RegisterApplicationMasterResponse mockRegResponse = mock(RegisterApplicationMasterResponse.class);
+      mockRegResponse = mock(RegisterApplicationMasterResponse.class);
       Resource mockMaxResource = mock(Resource.class);
       Map<ApplicationAccessType, String> mockAcls = mock(Map.class);
       when(mockRegResponse.getMaximumResourceCapability()).thenReturn(
@@ -126,6 +132,10 @@ class TestTaskSchedulerHelpers {
     @Override
     protected void serviceStop() {
     }
+
+    RegisterApplicationMasterResponse getRegistrationResponse() {
+      return mockRegResponse;
+    }
   }
   
   // Overrides start / stop. Will be controlled without the extra event handling thread.