You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/04/19 16:41:15 UTC

[hadoop] branch trunk updated: YARN-9448. Fix Opportunistic Scheduling for node local allocations. Contributed by Abhishek Modi.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aeadb94  YARN-9448. Fix Opportunistic Scheduling for node local allocations. Contributed by Abhishek Modi.
aeadb94 is described below

commit aeadb9432f84e679f00a9a12f63675c456bc14a8
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Fri Apr 19 09:41:06 2019 -0700

    YARN-9448. Fix Opportunistic Scheduling for node local allocations. Contributed by Abhishek Modi.
---
 .../scheduler/OpportunisticContainerAllocator.java | 44 +++++++++++++---------
 .../TestOpportunisticContainerAllocator.java       | 28 ++++++++------
 2 files changed, 43 insertions(+), 29 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 5600aa8..b31bd69 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -318,6 +318,7 @@ public class OpportunisticContainerAllocator {
     opportContext.addToOutstandingReqs(oppResourceReqs);
 
     Set<String> nodeBlackList = new HashSet<>(opportContext.getBlacklist());
+    Set<String> allocatedNodes = new HashSet<>();
     List<Container> allocatedContainers = new ArrayList<>();
 
     // Satisfy the outstanding OPPORTUNISTIC requests.
@@ -335,7 +336,7 @@ public class OpportunisticContainerAllocator {
         //          the outstanding reqs)
         Map<Resource, List<Allocation>> allocation = allocate(
             rmIdentifier, opportContext, schedulerKey, applicationAttemptId,
-            appSubmitter, nodeBlackList);
+            appSubmitter, nodeBlackList, allocatedNodes);
         if (allocation.size() > 0) {
           allocations.add(allocation);
           continueLoop = true;
@@ -357,14 +358,15 @@ public class OpportunisticContainerAllocator {
 
   private Map<Resource, List<Allocation>> allocate(long rmIdentifier,
       OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
-      ApplicationAttemptId appAttId, String userName, Set<String> blackList)
+      ApplicationAttemptId appAttId, String userName, Set<String> blackList,
+      Set<String> allocatedNodes)
       throws YarnException {
     Map<Resource, List<Allocation>> containers = new HashMap<>();
     for (EnrichedResourceRequest enrichedAsk :
         appContext.getOutstandingOpReqs().get(schedKey).values()) {
       allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
-          appContext.getContainerIdGenerator(), blackList, appAttId,
-          appContext.getNodeMap(), userName, containers, enrichedAsk);
+          appContext.getContainerIdGenerator(), blackList, allocatedNodes,
+          appAttId, appContext.getNodeMap(), userName, containers, enrichedAsk);
       ResourceRequest anyAsk = enrichedAsk.getRequest();
       if (!containers.isEmpty()) {
         LOG.info("Opportunistic allocation requested for [priority={}, "
@@ -379,9 +381,9 @@ public class OpportunisticContainerAllocator {
 
   private void allocateContainersInternal(long rmIdentifier,
       AllocationParams appParams, ContainerIdGenerator idCounter,
-      Set<String> blacklist, ApplicationAttemptId id,
-      Map<String, RemoteNode> allNodes, String userName,
-      Map<Resource, List<Allocation>> allocations,
+      Set<String> blacklist, Set<String> allocatedNodes,
+      ApplicationAttemptId id, Map<String, RemoteNode> allNodes,
+      String userName, Map<Resource, List<Allocation>> allocations,
       EnrichedResourceRequest enrichedAsk)
       throws YarnException {
     if (allNodes.size() == 0) {
@@ -406,7 +408,8 @@ public class OpportunisticContainerAllocator {
     }
     while (numAllocated < toAllocate) {
       Collection<RemoteNode> nodeCandidates =
-          findNodeCandidates(loopIndex, allNodes, blacklist, enrichedAsk);
+          findNodeCandidates(loopIndex, allNodes, blacklist, allocatedNodes,
+              enrichedAsk);
       for (RemoteNode rNode : nodeCandidates) {
         String rNodeHost = rNode.getNodeId().getHost();
         // Ignore black list
@@ -422,6 +425,10 @@ public class OpportunisticContainerAllocator {
           } else {
             continue;
           }
+        } else if (allocatedNodes.contains(rNodeHost)) {
+          LOG.info("Opportunistic container has already been allocated on {}.",
+              rNodeHost);
+          continue;
         }
         if (loopIndex == RACK_LOCAL_LOOP) {
           if (enrichedAsk.getRackLocations().contains(rNode.getRackName())) {
@@ -435,11 +442,7 @@ public class OpportunisticContainerAllocator {
             anyAsk, rNode);
         numAllocated++;
         updateMetrics(loopIndex);
-        // Try to spread the allocations across the nodes.
-        // But don't add if it is a node local request.
-        if (loopIndex != NODE_LOCAL_LOOP) {
-          blacklist.add(rNode.getNodeId().getHost());
-        }
+        allocatedNodes.add(rNodeHost);
         LOG.info("Allocated [" + container.getId() + "] as opportunistic at " +
             "location [" + location + "]");
         if (numAllocated >= toAllocate) {
@@ -475,7 +478,7 @@ public class OpportunisticContainerAllocator {
 
   private Collection<RemoteNode> findNodeCandidates(int loopIndex,
       Map<String, RemoteNode> allNodes, Set<String> blackList,
-      EnrichedResourceRequest enrichedRR) {
+      Set<String> allocatedNodes, EnrichedResourceRequest enrichedRR) {
     LinkedList<RemoteNode> retList = new LinkedList<>();
     String partition = getRequestPartition(enrichedRR);
     if (loopIndex > 1) {
@@ -495,8 +498,9 @@ public class OpportunisticContainerAllocator {
               allNodes, enrichedRR, retList, numContainers);
         } else {
           // Rack local candidates
-          numContainers = collectRackLocalCandidates(
-              allNodes, enrichedRR, retList, blackList, numContainers);
+          numContainers =
+              collectRackLocalCandidates(allNodes, enrichedRR, retList,
+                  blackList, allocatedNodes, numContainers);
         }
         if (numContainers == enrichedRR.getRequest().getNumContainers()) {
           // If there is no change in numContainers, then there is no point
@@ -510,12 +514,16 @@ public class OpportunisticContainerAllocator {
 
   private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
       EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
-      Set<String> blackList, int numContainers) {
+      Set<String> blackList, Set<String> allocatedNodes, int numContainers) {
     String partition = getRequestPartition(enrichedRR);
     for (RemoteNode rNode : allNodes.values()) {
       if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
           enrichedRR.getRackLocations().contains(rNode.getRackName())) {
-        if (blackList.contains(rNode.getNodeId().getHost())) {
+        String rHost = rNode.getNodeId().getHost();
+        if (blackList.contains(rHost)) {
+          continue;
+        }
+        if (allocatedNodes.contains(rHost)) {
           retList.addLast(rNode);
         } else {
           retList.addFirst(rNode);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
index 6f71b36..65ad748 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
@@ -198,35 +198,41 @@ public class TestOpportunisticContainerAllocator {
         Arrays.asList(
             ResourceRequest.newBuilder().allocationRequestId(1)
                 .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
                 .resourceName("/r1")
                 .capability(Resources.createResource(1 * GB))
                 .relaxLocality(true)
                 .executionType(ExecutionType.OPPORTUNISTIC).build(),
-            ResourceRequest.newBuilder().allocationRequestId(1)
+            ResourceRequest.newBuilder().allocationRequestId(2)
                 .priority(Priority.newInstance(1))
                 .resourceName("h1")
                 .capability(Resources.createResource(1 * GB))
                 .relaxLocality(true)
                 .executionType(ExecutionType.OPPORTUNISTIC).build(),
-            ResourceRequest.newBuilder().allocationRequestId(1)
+            ResourceRequest.newBuilder().allocationRequestId(2)
                 .priority(Priority.newInstance(1))
                 .resourceName(ResourceRequest.ANY)
                 .capability(Resources.createResource(1 * GB))
                 .relaxLocality(true)
                 .executionType(ExecutionType.OPPORTUNISTIC).build(),
-            ResourceRequest.newBuilder().allocationRequestId(2)
+            ResourceRequest.newBuilder().allocationRequestId(3)
                 .priority(Priority.newInstance(1))
                 .resourceName("/r1")
                 .capability(Resources.createResource(1 * GB))
                 .relaxLocality(true)
                 .executionType(ExecutionType.OPPORTUNISTIC).build(),
-            ResourceRequest.newBuilder().allocationRequestId(2)
+            ResourceRequest.newBuilder().allocationRequestId(3)
                 .priority(Priority.newInstance(1))
                 .resourceName("h1")
                 .capability(Resources.createResource(1 * GB))
                 .relaxLocality(true)
                 .executionType(ExecutionType.OPPORTUNISTIC).build(),
-            ResourceRequest.newBuilder().allocationRequestId(2)
+            ResourceRequest.newBuilder().allocationRequestId(3)
                 .priority(Priority.newInstance(1))
                 .resourceName(ResourceRequest.ANY)
                 .capability(Resources.createResource(1 * GB))
@@ -247,14 +253,14 @@ public class TestOpportunisticContainerAllocator {
     List<Container> containers = allocator.allocateContainers(
         blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
     LOG.info("Containers: {}", containers);
-    Set<String> allocatedHosts = new HashSet<>();
+    // all 3 containers should be allocated.
+    Assert.assertEquals(3, containers.size());
+    // container with allocation id 2 and 3 should be allocated on node h1
     for (Container c : containers) {
-      allocatedHosts.add(c.getNodeHttpAddress());
+      if (c.getAllocationRequestId() == 2 || c.getAllocationRequestId() == 3) {
+        Assert.assertEquals("h1:1234", c.getNodeHttpAddress());
+      }
     }
-    Assert.assertEquals(2, containers.size());
-    Assert.assertTrue(allocatedHosts.contains("h1:1234"));
-    Assert.assertFalse(allocatedHosts.contains("h2:1234"));
-    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org