You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ab...@apache.org on 2019/09/12 02:40:20 UTC

[hadoop] branch trunk updated: YARN-9819. Make TestOpportunisticContainerAllocatorAMService more resilient. Contribued by Abhishek Modi

This is an automated email from the ASF dual-hosted git repository.

abmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3b06f0b  YARN-9819. Make TestOpportunisticContainerAllocatorAMService more resilient. Contribued by Abhishek Modi
3b06f0b is described below

commit 3b06f0bf9e4c3d7bc50e5e2f9b44c1eeec897680
Author: Abhishek Modi <ab...@apache.org>
AuthorDate: Thu Sep 12 08:09:57 2019 +0530

    YARN-9819. Make TestOpportunisticContainerAllocatorAMService more resilient. Contribued by Abhishek Modi
---
 .../hadoop/yarn/server/resourcemanager/MockNM.java |  60 +++++-
 ...stOpportunisticContainerAllocatorAMService.java | 208 +++++++--------------
 2 files changed, 125 insertions(+), 143 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index fe3a889..3543bc4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRe
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
@@ -233,6 +234,13 @@ public class MockNM {
     return nodeHeartbeat(conts, isHealthy, responseId);
   }
 
+  /**
+   * Sends the heartbeat of the node.
+   * @param isHealthy whether node is healthy.
+   * @param resId response id.
+   * @return response of the heartbeat.
+   * @throws Exception
+   */
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
     ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
@@ -243,15 +251,62 @@ public class MockNM {
         isHealthy, resId);
   }
 
+  /**
+   * Sends the heartbeat of the node.
+   * @param updatedStats containers with updated status.
+   * @param isHealthy whether node is healthy.
+   * @return response of the heartbeat.
+   * @throws Exception
+   */
   public NodeHeartbeatResponse nodeHeartbeat(
       List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
     return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
         isHealthy, responseId);
   }
 
+  /**
+   * Sends the heartbeat of the node.
+   * @param oppContainersStatus opportunistic containers status.
+   * @param isHealthy whether node is healthy.
+   * @return response of the heartbeat.
+   * @throws Exception
+   */
+  public NodeHeartbeatResponse nodeHeartbeat(
+      OpportunisticContainersStatus oppContainersStatus, boolean isHealthy)
+      throws Exception {
+    return nodeHeartbeat(Collections.emptyList(),
+        Collections.emptyList(), isHealthy, responseId, oppContainersStatus);
+  }
+
+  /**
+   * Sends the heartbeat of the node.
+   * @param updatedStats containers with updated status.
+   * @param increasedConts containers whose resource has been increased.
+   * @param isHealthy whether node is healthy.
+   * @param resId response id.
+   * @return response of the heartbeat.
+   * @throws Exception
+   */
+  public NodeHeartbeatResponse nodeHeartbeat(
+      List<ContainerStatus> updatedStats, List<Container> increasedConts,
+      boolean isHealthy, int resId) throws Exception {
+    return nodeHeartbeat(updatedStats, increasedConts,
+        isHealthy, resId, null);
+  }
+
+  /**
+   * Sends the heartbeat of the node.
+   * @param updatedStats containers with updated status.
+   * @param increasedConts containers whose resource has been increased.
+   * @param isHealthy whether node is healthy.
+   * @param resId response id.
+   * @param oppContainersStatus opportunistic containers status.
+   * @return response of the heartbeat.
+   * @throws Exception
+   */
   public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
-      List<Container> increasedConts, boolean isHealthy, int resId)
-          throws Exception {
+      List<Container> increasedConts, boolean isHealthy, int resId,
+      OpportunisticContainersStatus oppContainersStatus) throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setResponseId(resId);
@@ -269,6 +324,7 @@ public class MockNM {
       containerStats.remove(cid);
     }
     status.setIncreasedContainers(increasedConts);
+    status.setOpportunisticContainersStatus(oppContainersStatus);
     NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
     healthStatus.setHealthReport("");
     healthStatus.setIsNodeHealthy(isHealthy);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index bdd4f71..7078244 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -90,9 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -103,7 +100,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -122,6 +118,9 @@ public class TestOpportunisticContainerAllocatorAMService {
   private MockRM rm;
   private DrainDispatcher dispatcher;
 
+  private OpportunisticContainersStatus oppContainersStatus =
+      getOpportunisticStatus();
+
   @Before
   public void createAndStartRM() {
     CapacitySchedulerConfiguration csConf =
@@ -184,38 +183,24 @@ public class TestOpportunisticContainerAllocatorAMService {
     nm3.registerNode();
     nm4.registerNode();
 
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
+    nm3.nodeHeartbeat(oppContainersStatus, true);
+    nm4.nodeHeartbeat(oppContainersStatus, true);
+
     OpportunisticContainerAllocatorAMService amservice =
         (OpportunisticContainerAllocatorAMService) rm
             .getApplicationMasterService();
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
-    ApplicationAttemptId attemptId =
-        app1.getCurrentAppAttempt().getAppAttemptId();
+
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
     ResourceScheduler scheduler = rm.getResourceScheduler();
-    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
-    RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
-    RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId());
-
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
-    nm3.nodeHeartbeat(true);
-    nm4.nodeHeartbeat(true);
-
-    // Send add and update node events to AM Service.
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode3));
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode4));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode3));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode4));
+
     // All nodes 1 - 4 will be applicable for scheduling.
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
-    nm3.nodeHeartbeat(true);
-    nm4.nodeHeartbeat(true);
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
+    nm3.nodeHeartbeat(oppContainersStatus, true);
+    nm4.nodeHeartbeat(oppContainersStatus, true);
 
     GenericTestUtils.waitFor(() ->
         amservice.getLeastLoadedNodes().size() == 4, 10, 10 * 100);
@@ -253,7 +238,7 @@ public class TestOpportunisticContainerAllocatorAMService {
             container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
             null, ExecutionType.GUARANTEED)));
     // Node on same host should not result in allocation
-    sameHostDiffNode.nodeHeartbeat(true);
+    sameHostDiffNode.nodeHeartbeat(oppContainersStatus, true);
     rm.drainEvents();
     allocateResponse =  am1.allocate(new ArrayList<>(), new ArrayList<>());
     Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
@@ -296,7 +281,7 @@ public class TestOpportunisticContainerAllocatorAMService {
             .getUpdateContainerRequest().getContainerId());
 
     // Ensure after correct node heartbeats, we should get the allocation
-    allocNode.nodeHeartbeat(true);
+    allocNode.nodeHeartbeat(oppContainersStatus, true);
     rm.drainEvents();
     allocateResponse =  am1.allocate(new ArrayList<>(), new ArrayList<>());
     Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
@@ -310,10 +295,10 @@ public class TestOpportunisticContainerAllocatorAMService {
     // Allocated cores+mem should have increased, available should decrease
     verifyMetrics(metrics, 14336, 14, 2048, 2, 2);
 
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
-    nm3.nodeHeartbeat(true);
-    nm4.nodeHeartbeat(true);
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
+    nm3.nodeHeartbeat(oppContainersStatus, true);
+    nm4.nodeHeartbeat(oppContainersStatus, true);
     rm.drainEvents();
 
     // Verify that the container is still in ACQUIRED state wrt the RM.
@@ -352,36 +337,20 @@ public class TestOpportunisticContainerAllocatorAMService {
     nm1.registerNode();
     nm2.registerNode();
 
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
+
     OpportunisticContainerAllocatorAMService amservice =
         (OpportunisticContainerAllocatorAMService) rm
             .getApplicationMasterService();
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
-    ApplicationAttemptId attemptId =
-        app1.getCurrentAppAttempt().getAppAttemptId();
+
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
     ResourceScheduler scheduler = rm.getResourceScheduler();
-    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
-
-    ((RMNodeImpl) rmNode1)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-    ((RMNodeImpl) rmNode2)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-
-    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
-        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
-    // Send add and update node events to AM Service.
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
 
     // All nodes 1 to 2 will be applicable for scheduling.
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
 
     GenericTestUtils.waitFor(() ->
         amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
@@ -478,36 +447,21 @@ public class TestOpportunisticContainerAllocatorAMService {
     nm1.registerNode();
     nm2.registerNode();
 
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
+
     OpportunisticContainerAllocatorAMService amservice =
         (OpportunisticContainerAllocatorAMService) rm
             .getApplicationMasterService();
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
-    ApplicationAttemptId attemptId =
-        app1.getCurrentAppAttempt().getAppAttemptId();
+
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
     ResourceScheduler scheduler = rm.getResourceScheduler();
-    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
 
-    ((RMNodeImpl) rmNode1)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-    ((RMNodeImpl) rmNode2)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-
-    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
-        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
-    // Send add and update node events to AM Service.
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
 
     // All nodes 1 to 2 will be applicable for scheduling.
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
 
     GenericTestUtils.waitFor(() ->
         amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
@@ -591,30 +545,17 @@ public class TestOpportunisticContainerAllocatorAMService {
     createAndStartRMWithAutoUpdateContainer();
     MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
     nm1.registerNode();
+    nm1.nodeHeartbeat(oppContainersStatus, true);
 
     OpportunisticContainerAllocatorAMService amservice =
         (OpportunisticContainerAllocatorAMService) rm
             .getApplicationMasterService();
     RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
-    ApplicationAttemptId attemptId =
-        app1.getCurrentAppAttempt().getAppAttemptId();
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
     ResourceScheduler scheduler = rm.getResourceScheduler();
     RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
 
-    nm1.nodeHeartbeat(true);
-
-    ((RMNodeImpl) rmNode1)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-
-    OpportunisticContainerContext ctxt =
-        ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
-            .getOpportunisticContainerContext();
-    // Send add and update node events to AM Service.
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
-
-    nm1.nodeHeartbeat(true);
+    nm1.nodeHeartbeat(oppContainersStatus, true);
 
     GenericTestUtils.waitFor(() ->
         amservice.getLeastLoadedNodes().size() == 1, 10, 10 * 100);
@@ -713,7 +654,7 @@ public class TestOpportunisticContainerAllocatorAMService {
     Assert.assertEquals(Resource.newInstance(1 * GB, 1),
         response.getContainersToUpdate().get(0).getResource());
 
-    nm1.nodeHeartbeat(true);
+    nm1.nodeHeartbeat(oppContainersStatus, true);
     // DEMOTE the container
     allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
         UpdateContainerRequest.newInstance(3, container.getId(),
@@ -735,7 +676,7 @@ public class TestOpportunisticContainerAllocatorAMService {
         uc.getContainer().getExecutionType());
     // Check that the container is updated in NM through NM heartbeat response
     if (response.getContainersToUpdate().size() == 0) {
-      response = nm1.nodeHeartbeat(true);
+      response = nm1.nodeHeartbeat(oppContainersStatus, true);
     }
     Assert.assertEquals(1, response.getContainersToUpdate().size());
     Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
@@ -761,6 +702,10 @@ public class TestOpportunisticContainerAllocatorAMService {
     nodes.put(nm2.getNodeId(), nm2);
     nm1.registerNode();
     nm2.registerNode();
+
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
+
     OpportunisticSchedulerMetrics metrics =
         OpportunisticSchedulerMetrics.getMetrics();
 
@@ -777,28 +722,10 @@ public class TestOpportunisticContainerAllocatorAMService {
         app1.getCurrentAppAttempt().getAppAttemptId();
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
     ResourceScheduler scheduler = rm.getResourceScheduler();
-    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
-    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
-
-    ((RMNodeImpl) rmNode1)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-    ((RMNodeImpl) rmNode2)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-
-    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
-        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
-    // Send add and update node events to AM Service.
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
 
     // All nodes 1 to 2 will be applicable for scheduling.
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
 
     GenericTestUtils.waitFor(() ->
         amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
@@ -890,6 +817,10 @@ public class TestOpportunisticContainerAllocatorAMService {
     MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
     nm1.registerNode();
     nm2.registerNode();
+
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
+
     OpportunisticContainerAllocatorAMService amservice =
         (OpportunisticContainerAllocatorAMService) rm
             .getApplicationMasterService();
@@ -900,20 +831,14 @@ public class TestOpportunisticContainerAllocatorAMService {
     ResourceScheduler scheduler = rm.getResourceScheduler();
     RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
     RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
-    nm1.nodeHeartbeat(true);
-    nm2.nodeHeartbeat(true);
-    ((RMNodeImpl) rmNode1)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-    ((RMNodeImpl) rmNode2)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
     OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
         .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
-    // Send add and update node events to AM Service.
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
     // Both node 1 and node 2 will be applicable for scheduling.
+    nm1.nodeHeartbeat(oppContainersStatus, true);
+    nm2.nodeHeartbeat(oppContainersStatus, true);
+
     for (int i = 0; i < 10; i++) {
       am1.allocate(
           Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
@@ -948,10 +873,10 @@ public class TestOpportunisticContainerAllocatorAMService {
   @Test(timeout = 60000)
   public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
     MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
+
     nm.registerNode();
-    OpportunisticContainerAllocatorAMService amservice =
-        (OpportunisticContainerAllocatorAMService) rm
-            .getApplicationMasterService();
+    nm.nodeHeartbeat(oppContainersStatus, true);
+
     RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default");
     ApplicationAttemptId attemptId =
         app.getCurrentAppAttempt().getAppAttemptId();
@@ -960,12 +885,8 @@ public class TestOpportunisticContainerAllocatorAMService {
     SchedulerApplicationAttempt schedulerAttempt =
         ((CapacityScheduler)scheduler).getApplicationAttempt(attemptId);
     RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId());
-    nm.nodeHeartbeat(true);
-    ((RMNodeImpl) rmNode1)
-        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-    // Send add and update node events to AM Service.
-    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
-    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    nm.nodeHeartbeat(oppContainersStatus, true);
 
     GenericTestUtils.waitFor(() ->
         scheduler.getNumClusterNodes() == 1, 10, 200 * 100);
@@ -1000,13 +921,18 @@ public class TestOpportunisticContainerAllocatorAMService {
         RMAppAttemptState.FAILED, false));
   }
 
+  private OpportunisticContainersStatus getOpportunisticStatus() {
+    return getOppurtunisticStatus(-1, 100, 1000);
+  }
+
   private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
-      int queueLength) {
-    OpportunisticContainersStatus status1 =
-        Mockito.mock(OpportunisticContainersStatus.class);
-    Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime);
-    Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength);
-    return status1;
+      int queueLength, int queueCapacity) {
+    OpportunisticContainersStatus status =
+        OpportunisticContainersStatus.newInstance();
+    status.setEstimatedQueueWaitTime(waitTime);
+    status.setOpportQueueCapacity(queueCapacity);
+    status.setWaitQueueLength(queueLength);
+    return status;
   }
 
   // Test if the OpportunisticContainerAllocatorAMService can handle both


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org