You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2018/05/04 22:00:08 UTC

hadoop git commit: YARN-8163. Add support for Node Labels in opportunistic scheduling. Contributed by Abhishek Modi.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 4cdbdce75 -> 6a69239d8


YARN-8163. Add support for Node Labels in opportunistic scheduling. Contributed by Abhishek Modi.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a69239d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a69239d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a69239d

Branch: refs/heads/trunk
Commit: 6a69239d867070ee85d79026542033ac661c4c1c
Parents: 4cdbdce
Author: Inigo Goiri <in...@apache.org>
Authored: Fri May 4 14:59:59 2018 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Fri May 4 14:59:59 2018 -0700

----------------------------------------------------------------------
 .../server/api/protocolrecords/RemoteNode.java  | 40 +++++++++++++++++++-
 .../impl/pb/RemoteNodePBImpl.java               | 19 ++++++++++
 .../OpportunisticContainerAllocator.java        | 38 ++++++++++++++++---
 .../yarn_server_common_service_protos.proto     |  1 +
 .../TestOpportunisticContainerAllocator.java    | 37 ++++++++++++++++++
 ...pportunisticContainerAllocatorAMService.java | 12 ++++++
 ...pportunisticContainerAllocatorAMService.java | 10 ++++-
 7 files changed, 149 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
index f621aa2..67ad5ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
@@ -65,6 +65,26 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
   }
 
   /**
+   * Create new Instance.
+   * @param nodeId NodeId.
+   * @param httpAddress Http address.
+   * @param rackName Rack Name.
+   * @param nodePartition Node Partition.
+   * @return RemoteNode Instance.
+   */
+  @Private
+  @Unstable
+  public static RemoteNode newInstance(NodeId nodeId, String httpAddress,
+      String rackName, String nodePartition) {
+    RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
+    remoteNode.setNodeId(nodeId);
+    remoteNode.setHttpAddress(httpAddress);
+    remoteNode.setRackName(rackName);
+    remoteNode.setNodePartition(nodePartition);
+    return remoteNode;
+  }
+
+  /**
    * Get {@link NodeId}.
    * @return NodeId.
    */
@@ -117,6 +137,23 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
    * @param other RemoteNode.
    * @return Comparison.
    */
+
+  /**
+   * Get Node Partition.
+   * @return Node Partition.
+   */
+  @Private
+  @Unstable
+  public  abstract String getNodePartition();
+
+  /**
+   * Set Node Partition.
+   * @param nodePartition
+   */
+  @Private
+  @Unstable
+  public abstract void setNodePartition(String nodePartition);
+
   @Override
   public int compareTo(RemoteNode other) {
     return this.getNodeId().compareTo(other.getNodeId());
@@ -127,6 +164,7 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
     return "RemoteNode{" +
         "nodeId=" + getNodeId() + ", " +
         "rackName=" + getRackName() + ", " +
-        "httpAddress=" + getHttpAddress() + "}";
+        "httpAddress=" + getHttpAddress() + ", " +
+        "partition=" + getNodePartition() + "}";
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
index c2492cf..8fb4357 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
@@ -137,6 +137,25 @@ public class RemoteNodePBImpl extends RemoteNode {
   }
 
   @Override
+  public String getNodePartition() {
+    RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodePartition()) {
+      return null;
+    }
+    return (p.getNodePartition());
+  }
+
+  @Override
+  public void setNodePartition(String nodePartition) {
+    maybeInitBuilder();
+    if (nodePartition == null) {
+      builder.clearNodePartition();
+      return;
+    }
+    builder.setNodePartition(nodePartition);
+  }
+
+  @Override
   public int hashCode() {
     return getProto().hashCode();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 1f53648..ae1ba43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.scheduler;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -461,10 +462,17 @@ public class OpportunisticContainerAllocator {
   private Collection<RemoteNode> findNodeCandidates(int loopIndex,
       Map<String, RemoteNode> allNodes, Set<String> blackList,
       EnrichedResourceRequest enrichedRR) {
+    LinkedList<RemoteNode> retList = new LinkedList<>();
+    String partition = getRequestPartition(enrichedRR);
     if (loopIndex > 1) {
-      return allNodes.values();
+      for (RemoteNode remoteNode : allNodes.values()) {
+        if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
+          retList.add(remoteNode);
+        }
+      }
+      return retList;
     } else {
-      LinkedList<RemoteNode> retList = new LinkedList<>();
+
       int numContainers = enrichedRR.getRequest().getNumContainers();
       while (numContainers > 0) {
         if (loopIndex == 0) {
@@ -489,8 +497,10 @@ public class OpportunisticContainerAllocator {
   private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
       EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
       Set<String> blackList, int numContainers) {
+    String partition = getRequestPartition(enrichedRR);
     for (RemoteNode rNode : allNodes.values()) {
-      if (enrichedRR.getRackLocations().contains(rNode.getRackName())) {
+      if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
+          enrichedRR.getRackLocations().contains(rNode.getRackName())) {
         if (blackList.contains(rNode.getNodeId().getHost())) {
           retList.addLast(rNode);
         } else {
@@ -508,9 +518,11 @@ public class OpportunisticContainerAllocator {
   private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
       EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
       int numContainers) {
+    String partition = getRequestPartition(enrichedRR);
     for (String nodeName : enrichedRR.getNodeLocations()) {
       RemoteNode remoteNode = allNodes.get(nodeName);
-      if (remoteNode != null) {
+      if (remoteNode != null &&
+          StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
         retList.add(remoteNode);
         numContainers--;
       }
@@ -563,7 +575,7 @@ public class OpportunisticContainerAllocator {
             capability, currTime + tokenExpiry,
             tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
             schedulerKey.getPriority(), currTime,
-            null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
+            null, getRemoteNodePartition(node), ContainerType.TASK,
             ExecutionType.OPPORTUNISTIC, schedulerKey.getAllocationRequestId());
     byte[] pwd =
         tokenSecretManager.createPassword(containerTokenIdentifier);
@@ -616,4 +628,20 @@ public class OpportunisticContainerAllocator {
     }
     return partitionedRequests;
   }
+
+  private String getRequestPartition(EnrichedResourceRequest enrichedRR) {
+    String partition = enrichedRR.getRequest().getNodeLabelExpression();
+    if (partition == null) {
+      partition = CommonNodeLabelsManager.NO_LABEL;
+    }
+    return partition;
+  }
+
+  private String getRemoteNodePartition(RemoteNode node) {
+    String partition = node.getNodePartition();
+    if (partition == null) {
+      partition = CommonNodeLabelsManager.NO_LABEL;
+    }
+    return partition;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 1b090bf..387ddb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -31,6 +31,7 @@ message RemoteNodeProto {
   optional NodeIdProto node_id = 1;
   optional string http_address = 2;
   optional string rack_name = 3;
+  optional string node_partition = 4;
 }
 
 message RegisterDistributedSchedulingAMResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
index 788b0b3..2d3b099 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
@@ -596,4 +596,41 @@ public class TestOpportunisticContainerAllocator {
     }
     Assert.assertEquals(100, containers.size());
   }
+
+  @Test
+  public void testAllocationWithNodeLabels() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(1 * GB), 1, true, "label",
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1")));
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    /* Since there is no node satisfying node label constraints, requests
+       won't get fulfilled.
+    */
+    Assert.assertEquals(0, containers.size());
+    Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size());
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1",
+                "label")));
+
+    containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Assert.assertEquals(1, containers.size());
+    Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index ce425df..9b13627 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
@@ -174,6 +175,16 @@ public class OpportunisticContainerAllocatorAMService
           appAttempt.getOpportunisticContainerContext();
       oppCtx.updateNodeList(getLeastLoadedNodes());
 
+      if (!partitionedAsks.getOpportunistic().isEmpty()) {
+        String appPartition = appAttempt.getAppAMNodePartitionName();
+
+        for (ResourceRequest req : partitionedAsks.getOpportunistic()) {
+          if (null == req.getNodeLabelExpression()) {
+            req.setNodeLabelExpression(appPartition);
+          }
+        }
+      }
+
       List<Container> oppContainers =
           oppContainerAllocator.allocateContainers(
               request.getResourceBlacklistRequest(),
@@ -436,6 +447,7 @@ public class OpportunisticContainerAllocatorAMService
     if (node != null) {
       RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
       rNode.setRackName(node.getRackName());
+      rNode.setNodePartition(node.getPartition());
       return rNode;
     }
     return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index efa76bc..5542157 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -927,6 +927,8 @@ public class TestOpportunisticContainerAllocatorAMService {
                 distAllReq.getProto()));
     Assert.assertEquals(
         "h1", dsAllocResp.getNodesForScheduling().get(0).getNodeId().getHost());
+    Assert.assertEquals(
+        "l1", dsAllocResp.getNodesForScheduling().get(1).getNodePartition());
 
     FinishApplicationMasterResponse dsfinishResp =
         new FinishApplicationMasterResponsePBImpl(
@@ -1004,9 +1006,13 @@ public class TestOpportunisticContainerAllocatorAMService {
             .getExecutionTypeRequest().getEnforceExecutionType());
         DistributedSchedulingAllocateResponse resp = factory
             .newRecordInstance(DistributedSchedulingAllocateResponse.class);
+        RemoteNode remoteNode1 = RemoteNode.newInstance(
+            NodeId.newInstance("h1", 1234), "http://h1:4321");
+        RemoteNode remoteNode2 = RemoteNode.newInstance(
+            NodeId.newInstance("h2", 1234), "http://h2:4321");
+        remoteNode2.setNodePartition("l1");
         resp.setNodesForScheduling(
-            Arrays.asList(RemoteNode.newInstance(
-                NodeId.newInstance("h1", 1234), "http://h1:4321")));
+            Arrays.asList(remoteNode1, remoteNode2));
         return resp;
       }
     };


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org