You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:29:07 UTC
[28/37] hadoop git commit: YARN-7790. Improve Capacity Scheduler
Async Scheduling to better handle node failures. Contributed by Wangda Tan.
YARN-7790. Improve Capacity Scheduler Async Scheduling to better handle node failures. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e9c72d04
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e9c72d04
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e9c72d04
Branch: refs/heads/HDFS-7240
Commit: e9c72d04beddfe0252d2e81123a9fe66bdf04078
Parents: 3400d0c
Author: Sunil G <su...@apache.org>
Authored: Mon Jan 29 20:43:08 2018 +0530
Committer: Sunil G <su...@apache.org>
Committed: Mon Jan 29 20:44:38 2018 +0530
----------------------------------------------------------------------
.../scheduler/AbstractYarnScheduler.java | 51 +++---
.../scheduler/SchedulerNode.java | 16 ++
.../scheduler/capacity/CapacityScheduler.java | 49 +++++-
.../TestRMHAForAsyncScheduler.java | 38 ++++-
.../TestCapacitySchedulerAsyncScheduling.java | 159 ++++++++++++++++++-
5 files changed, 276 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index c94c379..4b76327 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -980,11 +980,11 @@ public abstract class AbstractYarnScheduler
/**
* Get lists of new containers from NodeManager and process them.
* @param nm The RMNode corresponding to the NodeManager
+ * @param schedulerNode schedulerNode
* @return list of completed containers
*/
- protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
- SchedulerNode node = getNode(nm.getNodeID());
-
+ private List<ContainerStatus> updateNewContainerInfo(RMNode nm,
+ SchedulerNode schedulerNode) {
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<>();
@@ -999,14 +999,15 @@ public abstract class AbstractYarnScheduler
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
- containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+ containerLaunchedOnNode(launchedContainer.getContainerId(),
+ schedulerNode);
}
// Processing the newly increased containers
List<Container> newlyIncreasedContainers =
nm.pullNewlyIncreasedContainers();
for (Container container : newlyIncreasedContainers) {
- containerIncreasedOnNode(container.getId(), node, container);
+ containerIncreasedOnNode(container.getId(), schedulerNode, container);
}
return completedContainers;
@@ -1017,12 +1018,12 @@ public abstract class AbstractYarnScheduler
* @param completedContainers Extracted list of completed containers
* @param releasedResources Reference resource object for completed containers
* @param nodeId NodeId corresponding to the NodeManager
+ * @param schedulerNode schedulerNode
* @return The total number of released containers
*/
- protected int updateCompletedContainers(List<ContainerStatus>
- completedContainers, Resource releasedResources, NodeId nodeId) {
+ private int updateCompletedContainers(List<ContainerStatus> completedContainers,
+ Resource releasedResources, NodeId nodeId, SchedulerNode schedulerNode) {
int releasedContainers = 0;
- SchedulerNode node = getNode(nodeId);
List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
@@ -1030,8 +1031,8 @@ public abstract class AbstractYarnScheduler
RMContainer container = getRMContainer(containerId);
completedContainer(container,
completedContainer, RMContainerEventType.FINISHED);
- if (node != null) {
- node.releaseContainer(containerId, true);
+ if (schedulerNode != null) {
+ schedulerNode.releaseContainer(containerId, true);
}
if (container != null) {
@@ -1076,14 +1077,14 @@ public abstract class AbstractYarnScheduler
/**
* Update container and utilization information on the NodeManager.
* @param nm The NodeManager to update
+ * @param schedulerNode schedulerNode
*/
- protected void updateNodeResourceUtilization(RMNode nm) {
- SchedulerNode node = getNode(nm.getNodeID());
+ protected void updateNodeResourceUtilization(RMNode nm,
+ SchedulerNode schedulerNode) {
// Updating node resource utilization
- node.setAggregatedContainersUtilization(
+ schedulerNode.setAggregatedContainersUtilization(
nm.getAggregatedContainersUtilization());
- node.setNodeUtilization(nm.getNodeUtilization());
-
+ schedulerNode.setNodeUtilization(nm.getNodeUtilization());
}
/**
@@ -1097,12 +1098,17 @@ public abstract class AbstractYarnScheduler
}
// Process new container information
- List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
+ SchedulerNode schedulerNode = getNode(nm.getNodeID());
+ List<ContainerStatus> completedContainers = updateNewContainerInfo(nm,
+ schedulerNode);
+
+ // Notify Scheduler Node updated.
+ schedulerNode.notifyNodeUpdate();
// Process completed containers
Resource releasedResources = Resource.newInstance(0, 0);
int releasedContainers = updateCompletedContainers(completedContainers,
- releasedResources, nm.getNodeID());
+ releasedResources, nm.getNodeID(), schedulerNode);
// If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to
@@ -1115,18 +1121,17 @@ public abstract class AbstractYarnScheduler
.getEventHandler()
.handle(
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
- .newInstance(getSchedulerNode(nm.getNodeID())
- .getAllocatedResource(), 0)));
+ .newInstance(schedulerNode.getAllocatedResource(), 0)));
}
updateSchedulerHealthInformation(releasedResources, releasedContainers);
- updateNodeResourceUtilization(nm);
+ updateNodeResourceUtilization(nm, schedulerNode);
// Now node data structures are up-to-date and ready for scheduling.
if(LOG.isDebugEnabled()) {
- SchedulerNode node = getNode(nm.getNodeID());
- LOG.debug("Node being looked for scheduling " + nm +
- " availableResource: " + node.getUnallocatedResource());
+ LOG.debug(
+ "Node being looked for scheduling " + nm + " availableResource: "
+ + schedulerNode.getUnallocatedResource());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 05dbf1e..89f748d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
@@ -76,6 +77,9 @@ public abstract class SchedulerNode {
private volatile Set<String> labels = null;
+ // Last updated time
+ private volatile long lastHeartbeatMonotonicTime;
+
public SchedulerNode(RMNode node, boolean usePortForNodeName,
Set<String> labels) {
this.rmNode = node;
@@ -87,6 +91,7 @@ public abstract class SchedulerNode {
nodeName = rmNode.getHostName();
}
this.labels = ImmutableSet.copyOf(labels);
+ this.lastHeartbeatMonotonicTime = Time.monotonicNow();
}
public SchedulerNode(RMNode node, boolean usePortForNodeName) {
@@ -453,6 +458,17 @@ public abstract class SchedulerNode {
return this.nodeUtilization;
}
+ public long getLastHeartbeatMonotonicTime() {
+ return lastHeartbeatMonotonicTime;
+ }
+
+ /**
+ * This will be called for each node heartbeat.
+ */
+ public void notifyNodeUpdate() {
+ this.lastHeartbeatMonotonicTime = Time.monotonicNow();
+ }
+
private static class ContainerInfo {
private final RMContainer container;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 99f4456..03ca507 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -142,7 +142,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleC
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.Lock;
-import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -181,8 +180,6 @@ public class CapacityScheduler extends
private CSConfigurationProvider csConfProvider;
- protected Clock monotonicClock;
-
@Override
public void setConf(Configuration conf) {
yarnConf = conf;
@@ -243,6 +240,8 @@ public class CapacityScheduler extends
private RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
+ private static boolean printedVerboseLoggingForAsyncScheduling = false;
+
/**
* EXPERT
*/
@@ -471,6 +470,22 @@ public class CapacityScheduler extends
private final static Random random = new Random(System.currentTimeMillis());
+ private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
+ CapacityScheduler cs, boolean printVerboseLog) {
+ // Skip node which missed 2 heartbeats since the node might be dead and
+ // we should not continue allocate containers on that.
+ long timeElapsedFromLastHeartbeat =
+ Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
+ if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) {
+ if (printVerboseLog && LOG.isDebugEnabled()) {
+ LOG.debug("Skip scheduling on node because it haven't heartbeated for "
+ + timeElapsedFromLastHeartbeat / 1000.0f + " secs");
+ }
+ return true;
+ }
+ return false;
+ }
+
/**
* Schedule on all nodes by starting at a random point.
* @param cs
@@ -481,16 +496,42 @@ public class CapacityScheduler extends
Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
int start = random.nextInt(nodes.size());
+ // To avoid too verbose DEBUG logging, only print debug log once for
+ // every 10 secs.
+ boolean printSkipedNodeLogging = false;
+ if (Time.monotonicNow() / 1000 % 10 == 0) {
+ printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
+ } else {
+ printedVerboseLoggingForAsyncScheduling = false;
+ }
+
+ // Allocate containers of node [start, end)
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
+ if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+ continue;
+ }
cs.allocateContainersToNode(node.getNodeID(), false);
}
}
- // Now, just get everyone to be safe
+
+ current = 0;
+
+ // Allocate containers of node [0, start)
for (FiCaSchedulerNode node : nodes) {
+ if (current++ > start) {
+ break;
+ }
+ if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+ continue;
+ }
cs.allocateContainersToNode(node.getNodeID(), false);
}
+ if (printSkipedNodeLogging) {
+ printedVerboseLoggingForAsyncScheduling = true;
+ }
+
Thread.sleep(cs.getAsyncScheduleInterval());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
index 46d5cda..36f1762 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java
@@ -28,13 +28,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.List;
+
public class TestRMHAForAsyncScheduler extends RMHATestBase {
+ private TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread
+ nmHeartbeatThread = null;
@Before
@Override
@@ -57,26 +63,49 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase {
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
}
+ private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
+ if (nmHeartbeatThread != null) {
+ nmHeartbeatThread.setShouldStop();
+ nmHeartbeatThread = null;
+ }
+ nmHeartbeatThread =
+ new TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread(mockNMs,
+ interval);
+ nmHeartbeatThread.start();
+ }
+
+ private void pauseNMHeartbeat() {
+ if (nmHeartbeatThread != null) {
+ nmHeartbeatThread.setShouldStop();
+ nmHeartbeatThread = null;
+ }
+ }
+
@Test(timeout = 60000)
public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
// start two RMs, and transit rm1 to active, rm2 to standby
startRMs();
// register NM
- rm1.registerNode("h1:1234", 8192, 8);
+ MockNM nm = rm1.registerNode("192.1.1.1:1234", 8192, 8);
// submit app1 and check
RMApp app1 = submitAppAndCheckLaunched(rm1);
+ keepNMHeartbeat(Arrays.asList(nm), 1000);
// failover RM1 to RM2
explicitFailover();
checkAsyncSchedulerThreads(Thread.currentThread());
+ pauseNMHeartbeat();
// register NM, kill app1
- rm2.registerNode("h1:1234", 8192, 8);
+ nm = rm2.registerNode("192.1.1.1:1234", 8192, 8);
+ keepNMHeartbeat(Arrays.asList(nm), 1000);
+
rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.LAUNCHED);
rm2.killApp(app1.getApplicationId());
// submit app3 and check
RMApp app2 = submitAppAndCheckLaunched(rm2);
+ pauseNMHeartbeat();
// failover RM2 to RM1
HAServiceProtocol.StateChangeRequestInfo requestInfo =
@@ -92,12 +121,15 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase {
checkAsyncSchedulerThreads(Thread.currentThread());
// register NM, kill app2
- rm1.registerNode("h1:1234", 8192, 8);
+ nm = rm1.registerNode("192.1.1.1:1234", 8192, 8);
+ keepNMHeartbeat(Arrays.asList(nm), 1000);
+
rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.LAUNCHED);
rm1.killApp(app2.getApplicationId());
// submit app3 and check
submitAppAndCheckLaunched(rm1);
+ pauseNMHeartbeat();
rm1.stop();
rm2.stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9c72d04/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 77596e2..548b909 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
@@ -72,6 +73,8 @@ public class TestCapacitySchedulerAsyncScheduling {
RMNodeLabelsManager mgr;
+ private NMHeartbeatThread nmHeartbeatThread = null;
+
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
@@ -122,9 +125,11 @@ public class TestCapacitySchedulerAsyncScheduling {
List<MockNM> nms = new ArrayList<>();
// Add 10 nodes to the cluster, in the cluster we have 200 GB resource
for (int i = 0; i < 10; i++) {
- nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB));
+ nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
}
+ keepNMHeartbeat(nms, 1000);
+
List<MockAM> ams = new ArrayList<>();
// Add 3 applications to the cluster, one app in one queue
// the i-th app ask (20 * i) containers. So in total we will have
@@ -185,8 +190,8 @@ public class TestCapacitySchedulerAsyncScheduling {
// init RM & NMs & Nodes
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
- final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
- final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+ final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 9 * GB);
+ final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 9 * GB);
List<MockNM> nmLst = new ArrayList<>();
nmLst.add(nm1);
nmLst.add(nm2);
@@ -277,8 +282,8 @@ public class TestCapacitySchedulerAsyncScheduling {
// init RM & NMs & Nodes
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
- final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
- final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+ final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB);
+ final MockNM nm2 = rm.registerNode("127.0.0.2:2234", 9 * GB);
// init scheduler nodes
int waitTime = 1000;
@@ -416,8 +421,8 @@ public class TestCapacitySchedulerAsyncScheduling {
// init RM & NMs & Nodes
final MockRM rm = new MockRM(disableAsyncConf);
rm.start();
- final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
- final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB);
+ final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB);
+ final MockNM nm2 = rm.registerNode("127.0.0.2:1234", 9 * GB);
List<MockNM> nmLst = new ArrayList<>();
nmLst.add(nm1);
nmLst.add(nm2);
@@ -476,6 +481,146 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.stop();
}
+ /**
+ * Make sure scheduler skips NMs which haven't heartbeat for a while.
+ * @throws Exception
+ */
+ @Test
+ public void testAsyncSchedulerSkipNoHeartbeatNMs() throws Exception {
+ int heartbeatInterval = 100;
+ conf.setInt(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+ 1);
+ conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ + ".scheduling-interval-ms", 100);
+ // Heartbeat interval is 100 ms.
+ conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
+
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+
+ // inject node label manager
+ MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ rm.getRMContext().setNodeLabelManager(mgr);
+ rm.start();
+
+ List<MockNM> nms = new ArrayList<>();
+ // Add 10 nodes to the cluster, in the cluster we have 200 GB resource
+ for (int i = 0; i < 10; i++) {
+ nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
+ }
+
+ List<MockAM> ams = new ArrayList<>();
+
+ keepNMHeartbeat(nms, heartbeatInterval);
+
+ for (int i = 0; i < 3; i++) {
+ RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
+ Character.toString((char) (i % 34 + 97)), 1, null, null, false);
+ MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
+ am.registerAppAttempt();
+ ams.add(am);
+ }
+
+ pauseNMHeartbeat();
+
+ Thread.sleep(heartbeatInterval * 3);
+
+ // Applications request containers.
+ for (int i = 0; i < 3; i++) {
+ ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>());
+ }
+
+ for (int i = 0; i < 5; i++) {
+ // Do heartbeat for NM 0-4
+ nms.get(i).nodeHeartbeat(true);
+ }
+
+ // Wait for 2000 ms.
+ Thread.sleep(2000);
+
+ // Make sure that NM5-9 don't have non-AM containers.
+ for (int i = 0; i < 9; i++) {
+ if (i < 5) {
+ Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0);
+ } else {
+ Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0);
+ }
+ }
+
+ rm.close();
+ }
+
+ public static class NMHeartbeatThread extends Thread {
+ private List<MockNM> mockNMS;
+ private int interval;
+ private volatile boolean shouldStop = false;
+
+ public NMHeartbeatThread(List<MockNM> mockNMs, int interval) {
+ this.mockNMS = mockNMs;
+ this.interval = interval;
+ }
+
+ public void run() {
+ while (true) {
+ if (shouldStop) {
+ break;
+ }
+ for (MockNM nm : mockNMS) {
+ try {
+ nm.nodeHeartbeat(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void setShouldStop() {
+ shouldStop = true;
+ }
+ }
+
+ private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
+ if (nmHeartbeatThread != null) {
+ nmHeartbeatThread.setShouldStop();
+ nmHeartbeatThread = null;
+ }
+ nmHeartbeatThread = new NMHeartbeatThread(mockNMs, interval);
+ nmHeartbeatThread.start();
+ }
+
+ private void pauseNMHeartbeat() {
+ if (nmHeartbeatThread != null) {
+ nmHeartbeatThread.setShouldStop();
+ nmHeartbeatThread = null;
+ }
+ }
+
+ private int checkNumNonAMContainersOnNode(CapacityScheduler cs, MockNM nm) {
+ SchedulerNode node = cs.getNode(nm.getNodeId());
+ int nonAMContainer = 0;
+ for (RMContainer c : node.getCopiedListOfRunningContainers()) {
+ if (!c.isAMContainer()) {
+ nonAMContainer++;
+ }
+ }
+ return nonAMContainer;
+ }
+
private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
int nContainer, Resource resource, int priority, int startContainerId)
throws Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org