You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/08/07 18:34:13 UTC
hadoop git commit: YARN-4161. Capacity Scheduler : Assign single or
multiple containers per heart beat driven by configuration. (Wei Yan via
wangda)
Repository: hadoop
Updated Branches:
refs/heads/trunk a3a9c976c -> adb84f34d
YARN-4161. Capacity Scheduler : Assign single or multiple containers per heart beat driven by configuration. (Wei Yan via wangda)
Change-Id: Ic441ae4e0bf72e7232411eb54243ec143d5fd0d3
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/adb84f34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/adb84f34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/adb84f34
Branch: refs/heads/trunk
Commit: adb84f34db7e1cdcd72aa8e3deb464c48da9e353
Parents: a3a9c97
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Aug 7 11:32:12 2017 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Mon Aug 7 11:32:21 2017 -0700
----------------------------------------------------------------------
.../scheduler/capacity/CapacityScheduler.java | 53 ++++-
.../CapacitySchedulerConfiguration.java | 23 ++
.../capacity/TestCapacityScheduler.java | 232 ++++++++++++++++++-
3 files changed, 289 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb84f34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 2ccaf63..3286982 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -94,11 +94,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidExcep
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
@@ -163,6 +161,9 @@ public class CapacityScheduler extends
private int offswitchPerHeartbeatLimit;
+ private boolean assignMultipleEnabled;
+
+ private int maxAssignPerHeartbeat;
@Override
public void setConf(Configuration conf) {
@@ -308,6 +309,9 @@ public class CapacityScheduler extends
asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+ this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
+ this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
+
// number of threads for async scheduling
int maxAsyncSchedulingThreads = this.conf.getInt(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
@@ -1109,17 +1113,29 @@ public class CapacityScheduler extends
.getAssignmentInformation().getReserved());
}
- private boolean canAllocateMore(CSAssignment assignment, int offswitchCount) {
- if (null != assignment && Resources.greaterThan(getResourceCalculator(),
- getClusterResource(), assignment.getResource(), Resources.none())
- && offswitchCount < offswitchPerHeartbeatLimit) {
- // And it should not be a reserved container
- if (assignment.getAssignmentInformation().getNumReservations() == 0) {
- return true;
- }
+ private boolean canAllocateMore(CSAssignment assignment, int offswitchCount,
+ int assignedContainers) {
+ // Current assignment shouldn't be empty
+ if (assignment == null
+ || Resources.equals(assignment.getResource(), Resources.none())) {
+ return false;
}
- return false;
+ // offswitch assignment should be under threshold
+ if (offswitchCount >= offswitchPerHeartbeatLimit) {
+ return false;
+ }
+
+ // And it should not be a reserved container
+ if (assignment.getAssignmentInformation().getNumReservations() > 0) {
+ return false;
+ }
+
+ // assignMultipleEnabled should be ON,
+ // and assignedContainers should be under threshold
+ return assignMultipleEnabled
+ && (maxAssignPerHeartbeat == -1
+ || assignedContainers < maxAssignPerHeartbeat);
}
/**
@@ -1131,6 +1147,7 @@ public class CapacityScheduler extends
FiCaSchedulerNode node = getNode(nodeId);
if (null != node) {
int offswitchCount = 0;
+ int assignedContainers = 0;
PlacementSet<FiCaSchedulerNode> ps = new SimplePlacementSet<>(node);
CSAssignment assignment = allocateContainersToNode(ps, withNodeHeartbeat);
@@ -1141,7 +1158,13 @@ public class CapacityScheduler extends
offswitchCount++;
}
- while (canAllocateMore(assignment, offswitchCount)) {
+ if (Resources.greaterThan(calculator, getClusterResource(),
+ assignment.getResource(), Resources.none())) {
+ assignedContainers++;
+ }
+
+ while (canAllocateMore(assignment, offswitchCount,
+ assignedContainers)) {
// Try to see if it is possible to allocate multiple container for
// the same node heartbeat
assignment = allocateContainersToNode(ps, true);
@@ -1150,6 +1173,12 @@ public class CapacityScheduler extends
&& assignment.getType() == NodeType.OFF_SWITCH) {
offswitchCount++;
}
+
+ if (null != assignment
+ && Resources.greaterThan(calculator, getClusterResource(),
+ assignment.getResource(), Resources.none())) {
+ assignedContainers++;
+ }
}
if (offswitchCount >= offswitchPerHeartbeatLimit) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb84f34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 1e29d50..13b9ff6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -301,6 +301,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
+ @Private
+ public static final String ASSIGN_MULTIPLE_ENABLED = PREFIX
+ + "per-node-heartbeat.multiple-assignments-enabled";
+
+ @Private
+ public static final boolean DEFAULT_ASSIGN_MULTIPLE_ENABLED = true;
+
+ /** Maximum number of containers to assign on each check-in. */
+ @Private
+ public static final String MAX_ASSIGN_PER_HEARTBEAT = PREFIX
+ + "per-node-heartbeat.maximum-container-assignments";
+
+ @Private
+ public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
+
AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
public CapacitySchedulerConfiguration() {
@@ -1473,4 +1488,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
}
return userWeights;
}
+
+ public boolean getAssignMultipleEnabled() {
+ return getBoolean(ASSIGN_MULTIPLE_ENABLED, DEFAULT_ASSIGN_MULTIPLE_ENABLED);
+ }
+
+ public int getMaxAssignPerHeartbeat() {
+ return getInt(MAX_ASSIGN_PER_HEARTBEAT, DEFAULT_MAX_ASSIGN_PER_HEARTBEAT);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/adb84f34/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index f51f771..64e0df4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -233,6 +233,17 @@ public class TestCapacityScheduler {
}
}
+ private NodeManager registerNode(ResourceManager rm, String hostName,
+ int containerManagerPort, int httpPort, String rackName,
+ Resource capability) throws IOException, YarnException {
+ NodeManager nm = new NodeManager(hostName,
+ containerManagerPort, httpPort, rackName, capability, rm);
+ NodeAddedSchedulerEvent nodeAddEvent1 =
+ new NodeAddedSchedulerEvent(rm.getRMContext().getRMNodes()
+ .get(nm.getNodeId()));
+ rm.getResourceScheduler().handle(nodeAddEvent1);
+ return nm;
+ }
@Test (timeout = 30000)
public void testConfValidation() throws Exception {
@@ -267,12 +278,12 @@ public class TestCapacityScheduler {
}
}
- private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
+ private NodeManager
registerNode(String hostName, int containerManagerPort, int httpPort,
String rackName, Resource capability)
throws IOException, YarnException {
- org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
- new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(
+ NodeManager nm =
+ new NodeManager(
hostName, containerManagerPort, httpPort, rackName, capability,
resourceManager);
NodeAddedSchedulerEvent nodeAddEvent1 =
@@ -400,8 +411,216 @@ public class TestCapacityScheduler {
LOG.info("--- END: testCapacityScheduler ---");
}
- private void nodeUpdate(
- org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm) {
+ @Test
+ public void testNotAssignMultiple() throws Exception {
+ LOG.info("--- START: testNotAssignMultiple ---");
+ ResourceManager rm = new ResourceManager() {
+ @Override
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(getConfig());
+ return mgr;
+ }
+ };
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ csConf.setBoolean(
+ CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, false);
+ setupQueueConfiguration(csConf);
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ rm.init(conf);
+ rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+ rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
+ ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start();
+ RMContext mC = mock(RMContext.class);
+ when(mC.getConfigurationProvider()).thenReturn(
+ new LocalConfigurationProvider());
+
+ // Register node1
+ String host0 = "host_0";
+ NodeManager nm0 =
+ registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ Resources.createResource(10 * GB, 10));
+
+ // ResourceRequest priorities
+ Priority priority0 = Priority.newInstance(0);
+ Priority priority1 = Priority.newInstance(1);
+
+ // Submit an application
+ Application application0 = new Application("user_0", "a1", rm);
+ application0.submit();
+ application0.addNodeManager(host0, 1234, nm0);
+
+ Resource capability00 = Resources.createResource(1 * GB, 1);
+ application0.addResourceRequestSpec(priority0, capability00);
+
+ Resource capability01 = Resources.createResource(2 * GB, 1);
+ application0.addResourceRequestSpec(priority1, capability01);
+
+ Task task00 =
+ new Task(application0, priority0, new String[] {host0});
+ Task task01 =
+ new Task(application0, priority1, new String[] {host0});
+ application0.addTask(task00);
+ application0.addTask(task01);
+
+ // Submit another application
+ Application application1 = new Application("user_1", "b2", rm);
+ application1.submit();
+ application1.addNodeManager(host0, 1234, nm0);
+
+ Resource capability10 = Resources.createResource(3 * GB, 1);
+ application1.addResourceRequestSpec(priority0, capability10);
+
+ Resource capability11 = Resources.createResource(4 * GB, 1);
+ application1.addResourceRequestSpec(priority1, capability11);
+
+ Task task10 = new Task(application1, priority0, new String[] {host0});
+ Task task11 = new Task(application1, priority1, new String[] {host0});
+ application1.addTask(task10);
+ application1.addTask(task11);
+
+ // Send resource requests to the scheduler
+ application0.schedule();
+
+ application1.schedule();
+
+ // Send a heartbeat to kick the tires on the Scheduler
+ LOG.info("Kick!");
+
+ // task00, used=1G
+ nodeUpdate(rm, nm0);
+
+ // Get allocations from the scheduler
+ application0.schedule();
+ application1.schedule();
+ // 1 Task per heart beat should be scheduled
+ checkNodeResourceUsage(3 * GB, nm0); // task00 (1G)
+ checkApplicationResourceUsage(0 * GB, application0);
+ checkApplicationResourceUsage(3 * GB, application1);
+
+ // Another heartbeat
+ nodeUpdate(rm, nm0);
+ application0.schedule();
+ checkApplicationResourceUsage(1 * GB, application0);
+ application1.schedule();
+ checkApplicationResourceUsage(3 * GB, application1);
+ checkNodeResourceUsage(4 * GB, nm0);
+ LOG.info("--- START: testNotAssignMultiple ---");
+ }
+
+ @Test
+ public void testAssignMultiple() throws Exception {
+ LOG.info("--- START: testAssignMultiple ---");
+ ResourceManager rm = new ResourceManager() {
+ @Override
+ protected RMNodeLabelsManager createNodeLabelManager() {
+ RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(getConfig());
+ return mgr;
+ }
+ };
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration();
+ csConf.setBoolean(
+ CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, true);
+ // Each heartbeat will assign 2 containers at most
+ csConf.setInt(CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT, 2);
+ setupQueueConfiguration(csConf);
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ rm.init(conf);
+ rm.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+ rm.getRMContext().getNMTokenSecretManager().rollMasterKey();
+ ((AsyncDispatcher) rm.getRMContext().getDispatcher()).start();
+ RMContext mC = mock(RMContext.class);
+ when(mC.getConfigurationProvider()).thenReturn(
+ new LocalConfigurationProvider());
+
+ // Register node1
+ String host0 = "host_0";
+ NodeManager nm0 =
+ registerNode(rm, host0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
+ Resources.createResource(10 * GB, 10));
+
+ // ResourceRequest priorities
+ Priority priority0 = Priority.newInstance(0);
+ Priority priority1 = Priority.newInstance(1);
+
+ // Submit an application
+ Application application0 = new Application("user_0", "a1", rm);
+ application0.submit();
+ application0.addNodeManager(host0, 1234, nm0);
+
+ Resource capability00 = Resources.createResource(1 * GB, 1);
+ application0.addResourceRequestSpec(priority0, capability00);
+
+ Resource capability01 = Resources.createResource(2 * GB, 1);
+ application0.addResourceRequestSpec(priority1, capability01);
+
+ Task task00 = new Task(application0, priority0, new String[] {host0});
+ Task task01 = new Task(application0, priority1, new String[] {host0});
+ application0.addTask(task00);
+ application0.addTask(task01);
+
+ // Submit another application
+ Application application1 = new Application("user_1", "b2", rm);
+ application1.submit();
+ application1.addNodeManager(host0, 1234, nm0);
+
+ Resource capability10 = Resources.createResource(3 * GB, 1);
+ application1.addResourceRequestSpec(priority0, capability10);
+
+ Resource capability11 = Resources.createResource(4 * GB, 1);
+ application1.addResourceRequestSpec(priority1, capability11);
+
+ Task task10 =
+ new Task(application1, priority0, new String[] {host0});
+ Task task11 =
+ new Task(application1, priority1, new String[] {host0});
+ application1.addTask(task10);
+ application1.addTask(task11);
+
+ // Send resource requests to the scheduler
+ application0.schedule();
+
+ application1.schedule();
+
+ // Send a heartbeat to kick the tires on the Scheduler
+ LOG.info("Kick!");
+
+ // task_0_0, used=1G
+ nodeUpdate(rm, nm0);
+
+ // Get allocations from the scheduler
+ application0.schedule();
+ application1.schedule();
+ // 1 Task per heart beat should be scheduled
+ checkNodeResourceUsage(4 * GB, nm0); // task00 (1G)
+ checkApplicationResourceUsage(1 * GB, application0);
+ checkApplicationResourceUsage(3 * GB, application1);
+
+ // Another heartbeat
+ nodeUpdate(rm, nm0);
+ application0.schedule();
+ checkApplicationResourceUsage(3 * GB, application0);
+ application1.schedule();
+ checkApplicationResourceUsage(7 * GB, application1);
+ checkNodeResourceUsage(10 * GB, nm0);
+ LOG.info("--- START: testAssignMultiple ---");
+ }
+
+ private void nodeUpdate(ResourceManager rm, NodeManager nm) {
+ RMNode node = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+ // Send a heartbeat to kick the tires on the Scheduler
+ NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+ rm.getResourceScheduler().handle(nodeUpdate);
+ }
+
+ private void nodeUpdate(NodeManager nm) {
RMNode node = resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
// Send a heartbeat to kick the tires on the Scheduler
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
@@ -699,8 +918,7 @@ public class TestCapacityScheduler {
Assert.assertEquals(expected, application.getUsedResources().getMemorySize());
}
- private void checkNodeResourceUsage(int expected,
- org.apache.hadoop.yarn.server.resourcemanager.NodeManager node) {
+ private void checkNodeResourceUsage(int expected, NodeManager node) {
Assert.assertEquals(expected, node.getUsed().getMemorySize());
node.checkResourceUsage();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org