You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/04/19 16:41:15 UTC
[hadoop] branch trunk updated: YARN-9448. Fix Opportunistic
Scheduling for node local allocations. Contributed by Abhishek Modi.
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new aeadb94 YARN-9448. Fix Opportunistic Scheduling for node local allocations. Contributed by Abhishek Modi.
aeadb94 is described below
commit aeadb9432f84e679f00a9a12f63675c456bc14a8
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Fri Apr 19 09:41:06 2019 -0700
YARN-9448. Fix Opportunistic Scheduling for node local allocations. Contributed by Abhishek Modi.
---
.../scheduler/OpportunisticContainerAllocator.java | 44 +++++++++++++---------
.../TestOpportunisticContainerAllocator.java | 28 ++++++++------
2 files changed, 43 insertions(+), 29 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 5600aa8..b31bd69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -318,6 +318,7 @@ public class OpportunisticContainerAllocator {
opportContext.addToOutstandingReqs(oppResourceReqs);
Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
+ Set<String> allocatedNodes = new HashSet<>();
List<Container> allocatedContainers = new ArrayList<>();
// Satisfy the outstanding OPPORTUNISTIC requests.
@@ -335,7 +336,7 @@ public class OpportunisticContainerAllocator {
// the outstanding reqs)
Map<Resource, List<Allocation>> allocation = allocate(
rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
- appSubmitter, nodeBlackList);
+ appSubmitter, nodeBlackList, allocatedNodes);
if (allocation.size() > 0) {
allocations.add(allocation);
continueLoop = true;
@@ -357,14 +358,15 @@ public class OpportunisticContainerAllocator {
private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
- ApplicationAttemptId appAttId, String userName, Set<String> blackList)
+ ApplicationAttemptId appAttId, String userName, Set<String> blackList,
+ Set<String> allocatedNodes)
throws YarnException {
Map<Resource, List<Allocation>> containers = new HashMap<>();
for (EnrichedResourceRequest enrichedAsk :
appContext.getOutstandingOpReqs().get(schedKey).values()) {
allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
- appContext.getContainerIdGenerator(), blackList, appAttId,
- appContext.getNodeMap(), userName, containers, enrichedAsk);
+ appContext.getContainerIdGenerator(), blackList, allocatedNodes,
+ appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk);
ResourceRequest anyAsk = enrichedAsk.getRequest();
if (!containers.isEmpty()) {
LOG.info("Opportunistic allocation requested for [priority={}, "
@@ -379,9 +381,9 @@ public class OpportunisticContainerAllocator {
private void allocateContainersInternal(long rmIdentifier,
AllocationParams appParams, ContainerIdGenerator idCounter,
- Set<String> blacklist, ApplicationAttemptId id,
- Map<String, RemoteNode> allNodes, String userName,
- Map<Resource, List<Allocation>> allocations,
+ Set<String> blacklist, Set<String> allocatedNodes,
+ ApplicationAttemptId id, Map<String, RemoteNode> allNodes,
+ String userName, Map<Resource, List<Allocation>> allocations,
EnrichedResourceRequest enrichedAsk)
throws YarnException {
if (allNodes.size() == 0) {
@@ -406,7 +408,8 @@ public class OpportunisticContainerAllocator {
}
while (numAllocated < toAllocate) {
Collection<RemoteNode> nodeCandidates =
- findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk);
+ findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes,
+ enrichedAsk);
for (RemoteNode rNode : nodeCandidates) {
String rNodeHost = rNode.getNodeId().getHost();
// Ignore black list
@@ -422,6 +425,10 @@ public class OpportunisticContainerAllocator {
} else {
continue;
}
+ } else if (allocatedNodes.contains(rNodeHost)) {
+ LOG.info("Opportunistic container has already been allocated on {}.",
+ rNodeHost);
+ continue;
}
if (loopIndex == RACK_LOCAL_LOOP) {
if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
@@ -435,11 +442,7 @@ public class OpportunisticContainerAllocator {
anyAsk, rNode);
numAllocated++;
updateMetrics(loopIndex);
- // Try to spread the allocations across the nodes.
- // But don't add if it is a node local request.
- if (loopIndex != NODE_LOCAL_LOOP) {
- blacklist.add(rNode.getNodeId().getHost());
- }
+ allocatedNodes.add(rNodeHost);
LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
"location [" + location + "]");
if (numAllocated >= toAllocate) {
@@ -475,7 +478,7 @@ public class OpportunisticContainerAllocator {
private Collection<RemoteNode> findNodeCandidates(int loopIndex,
Map<String, RemoteNode> allNodes, Set<String> blackList,
- EnrichedResourceRequest enrichedRR) {
+ Set<String> allocatedNodes, EnrichedResourceRequest enrichedRR) {
LinkedList<RemoteNode> retList = new LinkedList<>();
String partition = getRequestPartition(enrichedRR);
if (loopIndex > 1) {
@@ -495,8 +498,9 @@ public class OpportunisticContainerAllocator {
allNodes, enrichedRR, retList, numContainers);
} else {
// Rack local candidates
- numContainers = collectRackLocalCandidates(
- allNodes, enrichedRR, retList, blackList, numContainers);
+ numContainers =
+ collectRackLocalCandidates(allNodes, enrichedRR, retList,
+ blackList, allocatedNodes, numContainers);
}
if (numContainers == enrichedRR.getRequest().getNumContainers()) {
// If there is no change in numContainers, then there is no point
@@ -510,12 +514,16 @@ public class OpportunisticContainerAllocator {
private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
- Set<String> blackList, int numContainers) {
+ Set<String> blackList, Set<String> allocatedNodes, int numContainers) {
String partition = getRequestPartition(enrichedRR);
for (RemoteNode rNode : allNodes.values()) {
if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
enrichedRR.getRackLocations().contains(rNode.getRackName())) {
- if (blackList.contains(rNode.getNodeId().getHost())) {
+ String rHost = rNode.getNodeId().getHost();
+ if (blackList.contains(rHost)) {
+ continue;
+ }
+ if (allocatedNodes.contains(rHost)) {
retList.addLast(rNode);
} else {
retList.addFirst(rNode);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
index 6f71b36..65ad748 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
@@ -198,35 +198,41 @@ public class TestOpportunisticContainerAllocator {
Arrays.asList(
ResourceRequest.newBuilder().allocationRequestId(1)
.priority(Priority.newInstance(1))
+ .resourceName(ResourceRequest.ANY)
+ .capability(Resources.createResource(1 * GB))
+ .relaxLocality(true)
+ .executionType(ExecutionType.OPPORTUNISTIC).build(),
+ ResourceRequest.newBuilder().allocationRequestId(2)
+ .priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
- ResourceRequest.newBuilder().allocationRequestId(1)
+ ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
- ResourceRequest.newBuilder().allocationRequestId(1)
+ ResourceRequest.newBuilder().allocationRequestId(2)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
- ResourceRequest.newBuilder().allocationRequestId(2)
+ ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1))
.resourceName("/r1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
- ResourceRequest.newBuilder().allocationRequestId(2)
+ ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1))
.resourceName("h1")
.capability(Resources.createResource(1 * GB))
.relaxLocality(true)
.executionType(ExecutionType.OPPORTUNISTIC).build(),
- ResourceRequest.newBuilder().allocationRequestId(2)
+ ResourceRequest.newBuilder().allocationRequestId(3)
.priority(Priority.newInstance(1))
.resourceName(ResourceRequest.ANY)
.capability(Resources.createResource(1 * GB))
@@ -247,14 +253,14 @@ public class TestOpportunisticContainerAllocator {
List<Container> containers = allocator.allocateContainers(
blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
LOG.info("Containers: {}", containers);
- Set<String> allocatedHosts = new HashSet<>();
+ // all 3 containers should be allocated.
+ Assert.assertEquals(3, containers.size());
+ // container with allocation id 2 and 3 should be allocated on node h1
for (Container c : containers) {
- allocatedHosts.add(c.getNodeHttpAddress());
+ if (c.getAllocationRequestId() == 2 || c.getAllocationRequestId() == 3) {
+ Assert.assertEquals("h1:1234", c.getNodeHttpAddress());
+ }
}
- Assert.assertEquals(2, containers.size());
- Assert.assertTrue(allocatedHosts.contains("h1:1234"));
- Assert.assertFalse(allocatedHosts.contains("h2:1234"));
- Assert.assertFalse(allocatedHosts.contains("h3:1234"));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org