You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/04/22 16:49:14 UTC
[hadoop] branch trunk updated: YARN-2889. Limit the number of
opportunistic container allocated per AM heartbeat. Contributed by Abhishek
Modi.
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 96e3027 YARN-2889. Limit the number of opportunistic container allocated per AM heartbeat. Contributed by Abhishek Modi.
96e3027 is described below
commit 96e3027e46a953ca995e4b44ef50bc2a30c7e838
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Mon Apr 22 09:49:03 2019 -0700
YARN-2889. Limit the number of opportunistic container allocated per AM heartbeat. Contributed by Abhishek Modi.
---
.../apache/hadoop/yarn/conf/YarnConfiguration.java | 11 ++
.../src/main/resources/yarn-default.xml | 9 +
.../scheduler/OpportunisticContainerAllocator.java | 70 +++++++-
.../TestOpportunisticContainerAllocator.java | 186 +++++++++++++++++++++
.../yarn/server/nodemanager/NodeManager.java | 8 +-
.../OpportunisticContainerAllocatorAMService.java | 7 +-
6 files changed, 283 insertions(+), 8 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 860227e..b21d763 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -402,6 +402,17 @@ public class YarnConfiguration extends Configuration {
public static final boolean
DEFAULT_OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = false;
+ /**
+ * Maximum number of opportunistic containers to be allocated in
+ * AM heartbeat.
+ */
+ @Unstable
+ public static final String
+ OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT =
+ RM_PREFIX + "opportunistic.max.container-allocation.per.am.heartbeat";
+ public static final int
+ DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT = -1;
+
/** Number of nodes to be used by the Opportunistic Container allocator for
* dispatching containers during container allocation. */
@Unstable
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e865942..a00b5d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3342,6 +3342,15 @@
<property>
<description>
+ Maximum number of opportunistic containers to be allocated per
+ Application Master heartbeat.
+ </description>
+ <name>yarn.resourcemanager.opportunistic.max.container-allocation.per.am.heartbeat</name>
+ <value>-1</value>
+ </property>
+
+ <property>
+ <description>
Number of nodes to be used by the Opportunistic Container Allocator for
dispatching containers during container allocation.
</description>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index b31bd69..10c2402 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.scheduler;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
@@ -70,6 +71,8 @@ public class OpportunisticContainerAllocator {
private static final int RACK_LOCAL_LOOP = 1;
private static final int OFF_SWITCH_LOOP = 2;
+ private int maxAllocationsPerAMHeartbeat = -1;
+
/**
* This class encapsulates application specific parameters used to build a
* Container.
@@ -292,6 +295,24 @@ public class OpportunisticContainerAllocator {
}
/**
+ * Create a new Opportunistic Container Allocator.
+ * @param tokenSecretManager TokenSecretManager
+ * @param maxAllocationsPerAMHeartbeat max number of containers to be
+ * allocated in one AM heartbeat
+ */
+ public OpportunisticContainerAllocator(
+ BaseContainerTokenSecretManager tokenSecretManager,
+ int maxAllocationsPerAMHeartbeat) {
+ this.tokenSecretManager = tokenSecretManager;
+ this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat;
+ }
+
+ @VisibleForTesting
+ void setMaxAllocationsPerAMHeartbeat(int maxAllocationsPerAMHeartbeat) {
+ this.maxAllocationsPerAMHeartbeat = maxAllocationsPerAMHeartbeat;
+ }
+
+ /**
* Allocate OPPORTUNISTIC containers.
* @param blackList Resource BlackList Request
* @param oppResourceReqs Opportunistic Resource Requests
@@ -316,7 +337,6 @@ public class OpportunisticContainerAllocator {
// Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(oppResourceReqs);
-
Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
Set<String> allocatedNodes = new HashSet<>();
List<Container> allocatedContainers = new ArrayList<>();
@@ -334,9 +354,21 @@ public class OpportunisticContainerAllocator {
// might be different than what is requested, which is why
// we need the requested capability (key) to match against
// the outstanding reqs)
+ int remAllocs = -1;
+ if (maxAllocationsPerAMHeartbeat > 0) {
+ remAllocs =
+ maxAllocationsPerAMHeartbeat - allocatedContainers.size()
+ - getTotalAllocations(allocations);
+ if (remAllocs <= 0) {
+ LOG.info("Not allocating more containers as we have reached max "
+ + "allocations per AM heartbeat {}",
+ maxAllocationsPerAMHeartbeat);
+ break;
+ }
+ }
Map<Resource, List<Allocation>> allocation = allocate(
rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
- appSubmitter, nodeBlackList, allocatedNodes);
+ appSubmitter, nodeBlackList, allocatedNodes, remAllocs);
if (allocation.size() > 0) {
allocations.add(allocation);
continueLoop = true;
@@ -356,17 +388,42 @@ public class OpportunisticContainerAllocator {
return allocatedContainers;
}
+ private int getTotalAllocations(
+ List<Map<Resource, List<Allocation>>> allocations) {
+ int totalAllocs = 0;
+ for (Map<Resource, List<Allocation>> allocation : allocations) {
+ for (List<Allocation> allocs : allocation.values()) {
+ totalAllocs += allocs.size();
+ }
+ }
+ return totalAllocs;
+ }
+
private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
ApplicationAttemptId appAttId, String userName, Set<String> blackList,
- Set<String> allocatedNodes)
+ Set<String> allocatedNodes, int maxAllocations)
throws YarnException {
Map<Resource, List<Allocation>> containers = new HashMap<>();
for (EnrichedResourceRequest enrichedAsk :
appContext.getOutstandingOpReqs().get(schedKey).values()) {
+ int remainingAllocs = -1;
+ if (maxAllocations > 0) {
+ int totalAllocated = 0;
+ for (List<Allocation> allocs : containers.values()) {
+ totalAllocated += allocs.size();
+ }
+ remainingAllocs = maxAllocations - totalAllocated;
+ if (remainingAllocs <= 0) {
+ LOG.info("Not allocating more containers as max allocations per AM "
+ + "heartbeat {} has reached", maxAllocationsPerAMHeartbeat);
+ break;
+ }
+ }
allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), blackList, allocatedNodes,
- appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk);
+ appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk,
+ remainingAllocs);
ResourceRequest anyAsk = enrichedAsk.getRequest();
if (!containers.isEmpty()) {
LOG.info("Opportunistic allocation requested for [priority={}, "
@@ -384,7 +441,7 @@ public class OpportunisticContainerAllocator {
Set<String> blacklist, Set<String> allocatedNodes,
ApplicationAttemptId id, Map<String, RemoteNode> allNodes,
String userName, Map<Resource, List<Allocation>> allocations,
- EnrichedResourceRequest enrichedAsk)
+ EnrichedResourceRequest enrichedAsk, int maxAllocations)
throws YarnException {
if (allNodes.size() == 0) {
LOG.info("No nodes currently available to " +
@@ -397,6 +454,9 @@ public class OpportunisticContainerAllocator {
allocations.get(anyAsk.getCapability()).size());
toAllocate = Math.min(toAllocate,
appParams.getMaxAllocationsPerSchedulerKeyPerRound());
+ if (maxAllocations >= 0) {
+ toAllocate = Math.min(maxAllocations, toAllocate);
+ }
int numAllocated = 0;
// Node Candidates are selected as follows:
// * Node local candidates selected in loop == 0
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
index 65ad748..57e397d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
@@ -643,4 +643,190 @@ public class TestOpportunisticContainerAllocator {
Assert.assertEquals(1, containers.size());
Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
}
+
+ /**
+ * Tests maximum number of opportunistic containers that can be allocated in
+ * AM heartbeat.
+ * @throws Exception
+ */
+ @Test
+ public void testMaxAllocationsPerAMHeartbeat() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ allocator.setMaxAllocationsPerAMHeartbeat(2);
+ final Priority priority = Priority.newInstance(1);
+ final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true);
+ final Resource resource = Resources.createResource(1 * GB);
+ List<ResourceRequest> reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(priority, "*",
+ resource, 3, true, null, oppRequest),
+ ResourceRequest.newInstance(priority, "h6",
+ resource, 3, true, null, oppRequest),
+ ResourceRequest.newInstance(priority, "/r3",
+ resource, 3, true, null, oppRequest));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+ List<Container> containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // Although capacity is present, but only 2 containers should be allocated
+ // as max allocation per AM heartbeat is set to 2.
+ Assert.assertEquals(2, containers.size());
+ containers = allocator.allocateContainers(
+ blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // Remaining 1 container should be allocated.
+ Assert.assertEquals(1, containers.size());
+ }
+
+ /**
+ * Tests maximum opportunistic container allocation per AM heartbeat for
+ * allocation requests with different scheduler key.
+ * @throws Exception
+ */
+ @Test
+ public void testMaxAllocationsPerAMHeartbeatDifferentSchedKey()
+ throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ allocator.setMaxAllocationsPerAMHeartbeat(2);
+ final ExecutionTypeRequest oppRequest = ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true);
+ final Resource resource = Resources.createResource(1 * GB);
+ List<ResourceRequest> reqs =
+ Arrays.asList(
+ ResourceRequest.newInstance(Priority.newInstance(1), "*",
+ resource, 1, true, null, oppRequest),
+ ResourceRequest.newInstance(Priority.newInstance(2), "h6",
+ resource, 2, true, null, oppRequest),
+ ResourceRequest.newInstance(Priority.newInstance(3), "/r3",
+ resource, 2, true, null, oppRequest));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+ List<Container> containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // Although capacity is present, but only 2 containers should be allocated
+ // as max allocation per AM heartbeat is set to 2.
+ Assert.assertEquals(2, containers.size());
+ containers = allocator.allocateContainers(
+ blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // 2 more containers should be allocated from pending allocation requests.
+ Assert.assertEquals(2, containers.size());
+ containers = allocator.allocateContainers(
+ blacklistRequest, new ArrayList<>(), appAttId, oppCntxt, 1L, "user1");
+ LOG.info("Containers: {}", containers);
+ // Remaining 1 container should be allocated.
+ Assert.assertEquals(1, containers.size());
+ }
+
+ /**
+ * Tests maximum opportunistic container allocation per AM heartbeat when
+ * limit is set to -1.
+ * @throws Exception
+ */
+ @Test
+ public void testMaxAllocationsPerAMHeartbeatWithNoLimit() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ allocator.setMaxAllocationsPerAMHeartbeat(-1);
+
+ Priority priority = Priority.newInstance(1);
+ Resource capability = Resources.createResource(1 * GB);
+ List<ResourceRequest> reqs = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+ .priority(priority)
+ .resourceName("h1")
+ .capability(capability)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ }
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r1")));
+
+ List<Container> containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
+
+ // all containers should be allocated in single heartbeat.
+ Assert.assertEquals(20, containers.size());
+ }
+
+ /**
+ * Tests maximum opportunistic container allocation per AM heartbeat when
+ * limit is set to higher value.
+ * @throws Exception
+ */
+ @Test
+ public void testMaxAllocationsPerAMHeartbeatWithHighLimit()
+ throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ allocator.setMaxAllocationsPerAMHeartbeat(100);
+
+ Priority priority = Priority.newInstance(1);
+ Resource capability = Resources.createResource(1 * GB);
+ List<ResourceRequest> reqs = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+ .priority(priority)
+ .resourceName("h1")
+ .capability(capability)
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build());
+ }
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+ RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "h2:1234", "/r1")));
+
+ List<Container> containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "user1");
+
+ // all containers should be allocated in single heartbeat.
+ Assert.assertEquals(20, containers.size());
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 1ed1fda..89e3b47 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -473,10 +473,14 @@ public class NodeManager extends CompositeService
.getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
((NMContext) context).setWebServer(webServer);
-
+ int maxAllocationsPerAMHeartbeat = conf.getInt(
+ YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
+ YarnConfiguration.
+ DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
((NMContext) context).setQueueableContainerAllocator(
new OpportunisticContainerAllocator(
- context.getContainerTokenSecretManager()));
+ context.getContainerTokenSecretManager(),
+ maxAllocationsPerAMHeartbeat));
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index 9e861bd..a360ed2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -229,8 +229,13 @@ public class OpportunisticContainerAllocatorAMService
YarnScheduler scheduler) {
super(OpportunisticContainerAllocatorAMService.class.getName(),
rmContext, scheduler);
+ int maxAllocationsPerAMHeartbeat = rmContext.getYarnConfiguration().getInt(
+ YarnConfiguration.OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT,
+ YarnConfiguration.
+ DEFAULT_OPP_CONTAINER_MAX_ALLOCATIONS_PER_AM_HEARTBEAT);
this.oppContainerAllocator = new OpportunisticContainerAllocator(
- rmContext.getContainerTokenSecretManager());
+ rmContext.getContainerTokenSecretManager(),
+ maxAllocationsPerAMHeartbeat);
this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org