You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ab...@apache.org on 2019/09/12 02:40:20 UTC
[hadoop] branch trunk updated: YARN-9819. Make
TestOpportunisticContainerAllocatorAMService more resilient. Contribued by
Abhishek Modi
This is an automated email from the ASF dual-hosted git repository.
abmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3b06f0b YARN-9819. Make TestOpportunisticContainerAllocatorAMService more resilient. Contribued by Abhishek Modi
3b06f0b is described below
commit 3b06f0bf9e4c3d7bc50e5e2f9b44c1eeec897680
Author: Abhishek Modi <ab...@apache.org>
AuthorDate: Thu Sep 12 08:09:57 2019 +0530
YARN-9819. Make TestOpportunisticContainerAllocatorAMService more resilient. Contribued by Abhishek Modi
---
.../hadoop/yarn/server/resourcemanager/MockNM.java | 60 +++++-
...stOpportunisticContainerAllocatorAMService.java | 208 +++++++--------------
2 files changed, 125 insertions(+), 143 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index fe3a889..3543bc4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRe
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
@@ -233,6 +234,13 @@ public class MockNM {
return nodeHeartbeat(conts, isHealthy, responseId);
}
+ /**
+ * Sends the heartbeat of the node.
+ * @param isHealthy whether node is healthy.
+ * @param resId response id.
+ * @return response of the heartbeat.
+ * @throws Exception
+ */
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
@@ -243,15 +251,62 @@ public class MockNM {
isHealthy, resId);
}
+ /**
+ * Sends the heartbeat of the node.
+ * @param updatedStats containers with updated status.
+ * @param isHealthy whether node is healthy.
+ * @return response of the heartbeat.
+ * @throws Exception
+ */
public NodeHeartbeatResponse nodeHeartbeat(
List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
isHealthy, responseId);
}
+ /**
+ * Sends the heartbeat of the node.
+ * @param oppContainersStatus opportunistic containers status.
+ * @param isHealthy whether node is healthy.
+ * @return response of the heartbeat.
+ * @throws Exception
+ */
+ public NodeHeartbeatResponse nodeHeartbeat(
+ OpportunisticContainersStatus oppContainersStatus, boolean isHealthy)
+ throws Exception {
+ return nodeHeartbeat(Collections.emptyList(),
+ Collections.emptyList(), isHealthy, responseId, oppContainersStatus);
+ }
+
+ /**
+ * Sends the heartbeat of the node.
+ * @param updatedStats containers with updated status.
+ * @param increasedConts containers whose resource has been increased.
+ * @param isHealthy whether node is healthy.
+ * @param resId response id.
+ * @return response of the heartbeat.
+ * @throws Exception
+ */
+ public NodeHeartbeatResponse nodeHeartbeat(
+ List<ContainerStatus> updatedStats, List<Container> increasedConts,
+ boolean isHealthy, int resId) throws Exception {
+ return nodeHeartbeat(updatedStats, increasedConts,
+ isHealthy, resId, null);
+ }
+
+ /**
+ * Sends the heartbeat of the node.
+ * @param updatedStats containers with updated status.
+ * @param increasedConts containers whose resource has been increased.
+ * @param isHealthy whether node is healthy.
+ * @param resId response id.
+ * @param oppContainersStatus opportunistic containers status.
+ * @return response of the heartbeat.
+ * @throws Exception
+ */
public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
- List<Container> increasedConts, boolean isHealthy, int resId)
- throws Exception {
+ List<Container> increasedConts, boolean isHealthy, int resId,
+ OpportunisticContainersStatus oppContainersStatus) throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setResponseId(resId);
@@ -269,6 +324,7 @@ public class MockNM {
containerStats.remove(cid);
}
status.setIncreasedContainers(increasedConts);
+ status.setOpportunisticContainersStatus(oppContainersStatus);
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
healthStatus.setHealthReport("");
healthStatus.setIsNodeHealthy(isHealthy);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index bdd4f71..7078244 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -79,7 +79,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@@ -90,9 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@@ -103,7 +100,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.Mockito;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -122,6 +118,9 @@ public class TestOpportunisticContainerAllocatorAMService {
private MockRM rm;
private DrainDispatcher dispatcher;
+ private OpportunisticContainersStatus oppContainersStatus =
+ getOpportunisticStatus();
+
@Before
public void createAndStartRM() {
CapacitySchedulerConfiguration csConf =
@@ -184,38 +183,24 @@ public class TestOpportunisticContainerAllocatorAMService {
nm3.registerNode();
nm4.registerNode();
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
+ nm3.nodeHeartbeat(oppContainersStatus, true);
+ nm4.nodeHeartbeat(oppContainersStatus, true);
+
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
- ApplicationAttemptId attemptId =
- app1.getCurrentAppAttempt().getAppAttemptId();
+
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
- RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
- RMNode rmNode3 = rm.getRMContext().getRMNodes().get(nm3.getNodeId());
- RMNode rmNode4 = rm.getRMContext().getRMNodes().get(nm4.getNodeId());
-
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
- nm3.nodeHeartbeat(true);
- nm4.nodeHeartbeat(true);
-
- // Send add and update node events to AM Service.
- amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
- amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
- amservice.handle(new NodeAddedSchedulerEvent(rmNode3));
- amservice.handle(new NodeAddedSchedulerEvent(rmNode4));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode3));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode4));
+
// All nodes 1 - 4 will be applicable for scheduling.
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
- nm3.nodeHeartbeat(true);
- nm4.nodeHeartbeat(true);
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
+ nm3.nodeHeartbeat(oppContainersStatus, true);
+ nm4.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 4, 10, 10 * 100);
@@ -253,7 +238,7 @@ public class TestOpportunisticContainerAllocatorAMService {
container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
null, ExecutionType.GUARANTEED)));
// Node on same host should not result in allocation
- sameHostDiffNode.nodeHeartbeat(true);
+ sameHostDiffNode.nodeHeartbeat(oppContainersStatus, true);
rm.drainEvents();
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
Assert.assertEquals(0, allocateResponse.getUpdatedContainers().size());
@@ -296,7 +281,7 @@ public class TestOpportunisticContainerAllocatorAMService {
.getUpdateContainerRequest().getContainerId());
// Ensure after correct node heartbeats, we should get the allocation
- allocNode.nodeHeartbeat(true);
+ allocNode.nodeHeartbeat(oppContainersStatus, true);
rm.drainEvents();
allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
Assert.assertEquals(1, allocateResponse.getUpdatedContainers().size());
@@ -310,10 +295,10 @@ public class TestOpportunisticContainerAllocatorAMService {
// Allocated cores+mem should have increased, available should decrease
verifyMetrics(metrics, 14336, 14, 2048, 2, 2);
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
- nm3.nodeHeartbeat(true);
- nm4.nodeHeartbeat(true);
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
+ nm3.nodeHeartbeat(oppContainersStatus, true);
+ nm4.nodeHeartbeat(oppContainersStatus, true);
rm.drainEvents();
// Verify that the container is still in ACQUIRED state wrt the RM.
@@ -352,36 +337,20 @@ public class TestOpportunisticContainerAllocatorAMService {
nm1.registerNode();
nm2.registerNode();
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
+
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
- ApplicationAttemptId attemptId =
- app1.getCurrentAppAttempt().getAppAttemptId();
+
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
- RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
-
- ((RMNodeImpl) rmNode1)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
- ((RMNodeImpl) rmNode2)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-
- OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
- .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
- // Send add and update node events to AM Service.
- amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
- amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// All nodes 1 to 2 will be applicable for scheduling.
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
@@ -478,36 +447,21 @@ public class TestOpportunisticContainerAllocatorAMService {
nm1.registerNode();
nm2.registerNode();
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
+
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
- ApplicationAttemptId attemptId =
- app1.getCurrentAppAttempt().getAppAttemptId();
+
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
- RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
- ((RMNodeImpl) rmNode1)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
- ((RMNodeImpl) rmNode2)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-
- OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
- .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
- // Send add and update node events to AM Service.
- amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
- amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// All nodes 1 to 2 will be applicable for scheduling.
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
@@ -591,30 +545,17 @@ public class TestOpportunisticContainerAllocatorAMService {
createAndStartRMWithAutoUpdateContainer();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nm1.registerNode();
+ nm1.nodeHeartbeat(oppContainersStatus, true);
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
- ApplicationAttemptId attemptId =
- app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
- nm1.nodeHeartbeat(true);
-
- ((RMNodeImpl) rmNode1)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-
- OpportunisticContainerContext ctxt =
- ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
- .getOpportunisticContainerContext();
- // Send add and update node events to AM Service.
- amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
-
- nm1.nodeHeartbeat(true);
+ nm1.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 1, 10, 10 * 100);
@@ -713,7 +654,7 @@ public class TestOpportunisticContainerAllocatorAMService {
Assert.assertEquals(Resource.newInstance(1 * GB, 1),
response.getContainersToUpdate().get(0).getResource());
- nm1.nodeHeartbeat(true);
+ nm1.nodeHeartbeat(oppContainersStatus, true);
// DEMOTE the container
allocateResponse = am1.sendContainerUpdateRequest(Arrays.asList(
UpdateContainerRequest.newInstance(3, container.getId(),
@@ -735,7 +676,7 @@ public class TestOpportunisticContainerAllocatorAMService {
uc.getContainer().getExecutionType());
// Check that the container is updated in NM through NM heartbeat response
if (response.getContainersToUpdate().size() == 0) {
- response = nm1.nodeHeartbeat(true);
+ response = nm1.nodeHeartbeat(oppContainersStatus, true);
}
Assert.assertEquals(1, response.getContainersToUpdate().size());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
@@ -761,6 +702,10 @@ public class TestOpportunisticContainerAllocatorAMService {
nodes.put(nm2.getNodeId(), nm2);
nm1.registerNode();
nm2.registerNode();
+
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
+
OpportunisticSchedulerMetrics metrics =
OpportunisticSchedulerMetrics.getMetrics();
@@ -777,28 +722,10 @@ public class TestOpportunisticContainerAllocatorAMService {
app1.getCurrentAppAttempt().getAppAttemptId();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
ResourceScheduler scheduler = rm.getResourceScheduler();
- RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
-
- ((RMNodeImpl) rmNode1)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
- ((RMNodeImpl) rmNode2)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
-
- OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
- .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
- // Send add and update node events to AM Service.
- amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
- amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
// All nodes 1 to 2 will be applicable for scheduling.
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
amservice.getLeastLoadedNodes().size() == 2, 10, 10 * 100);
@@ -890,6 +817,10 @@ public class TestOpportunisticContainerAllocatorAMService {
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
+
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
+
OpportunisticContainerAllocatorAMService amservice =
(OpportunisticContainerAllocatorAMService) rm
.getApplicationMasterService();
@@ -900,20 +831,14 @@ public class TestOpportunisticContainerAllocatorAMService {
ResourceScheduler scheduler = rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
- nm1.nodeHeartbeat(true);
- nm2.nodeHeartbeat(true);
- ((RMNodeImpl) rmNode1)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
- ((RMNodeImpl) rmNode2)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+
OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
.getApplicationAttempt(attemptId).getOpportunisticContainerContext();
- // Send add and update node events to AM Service.
- amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
- amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
// Both node 1 and node 2 will be applicable for scheduling.
+ nm1.nodeHeartbeat(oppContainersStatus, true);
+ nm2.nodeHeartbeat(oppContainersStatus, true);
+
for (int i = 0; i < 10; i++) {
am1.allocate(
Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
@@ -948,10 +873,10 @@ public class TestOpportunisticContainerAllocatorAMService {
@Test(timeout = 60000)
public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
+
nm.registerNode();
- OpportunisticContainerAllocatorAMService amservice =
- (OpportunisticContainerAllocatorAMService) rm
- .getApplicationMasterService();
+ nm.nodeHeartbeat(oppContainersStatus, true);
+
RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default");
ApplicationAttemptId attemptId =
app.getCurrentAppAttempt().getAppAttemptId();
@@ -960,12 +885,8 @@ public class TestOpportunisticContainerAllocatorAMService {
SchedulerApplicationAttempt schedulerAttempt =
((CapacityScheduler)scheduler).getApplicationAttempt(attemptId);
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId());
- nm.nodeHeartbeat(true);
- ((RMNodeImpl) rmNode1)
- .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
- // Send add and update node events to AM Service.
- amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
- amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+ nm.nodeHeartbeat(oppContainersStatus, true);
GenericTestUtils.waitFor(() ->
scheduler.getNumClusterNodes() == 1, 10, 200 * 100);
@@ -1000,13 +921,18 @@ public class TestOpportunisticContainerAllocatorAMService {
RMAppAttemptState.FAILED, false));
}
+ private OpportunisticContainersStatus getOpportunisticStatus() {
+ return getOppurtunisticStatus(-1, 100, 1000);
+ }
+
private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
- int queueLength) {
- OpportunisticContainersStatus status1 =
- Mockito.mock(OpportunisticContainersStatus.class);
- Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime);
- Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength);
- return status1;
+ int queueLength, int queueCapacity) {
+ OpportunisticContainersStatus status =
+ OpportunisticContainersStatus.newInstance();
+ status.setEstimatedQueueWaitTime(waitTime);
+ status.setOpportQueueCapacity(queueCapacity);
+ status.setWaitQueueLength(queueLength);
+ return status;
}
// Test if the OpportunisticContainerAllocatorAMService can handle both
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org