You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2018/05/04 22:00:08 UTC
hadoop git commit: YARN-8163. Add support for Node Labels in
opportunistic scheduling. Contributed by Abhishek Modi.
Repository: hadoop
Updated Branches:
refs/heads/trunk 4cdbdce75 -> 6a69239d8
YARN-8163. Add support for Node Labels in opportunistic scheduling. Contributed by Abhishek Modi.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a69239d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a69239d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a69239d
Branch: refs/heads/trunk
Commit: 6a69239d867070ee85d79026542033ac661c4c1c
Parents: 4cdbdce
Author: Inigo Goiri <in...@apache.org>
Authored: Fri May 4 14:59:59 2018 -0700
Committer: Inigo Goiri <in...@apache.org>
Committed: Fri May 4 14:59:59 2018 -0700
----------------------------------------------------------------------
.../server/api/protocolrecords/RemoteNode.java | 40 +++++++++++++++++++-
.../impl/pb/RemoteNodePBImpl.java | 19 ++++++++++
.../OpportunisticContainerAllocator.java | 38 ++++++++++++++++---
.../yarn_server_common_service_protos.proto | 1 +
.../TestOpportunisticContainerAllocator.java | 37 ++++++++++++++++++
...pportunisticContainerAllocatorAMService.java | 12 ++++++
...pportunisticContainerAllocatorAMService.java | 10 ++++-
7 files changed, 149 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
index f621aa2..67ad5ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RemoteNode.java
@@ -65,6 +65,26 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
}
/**
+ * Create new Instance.
+ * @param nodeId NodeId.
+ * @param httpAddress Http address.
+ * @param rackName Rack Name.
+ * @param nodePartition Node Partition.
+ * @return RemoteNode Instance.
+ */
+ @Private
+ @Unstable
+ public static RemoteNode newInstance(NodeId nodeId, String httpAddress,
+ String rackName, String nodePartition) {
+ RemoteNode remoteNode = Records.newRecord(RemoteNode.class);
+ remoteNode.setNodeId(nodeId);
+ remoteNode.setHttpAddress(httpAddress);
+ remoteNode.setRackName(rackName);
+ remoteNode.setNodePartition(nodePartition);
+ return remoteNode;
+ }
+
+ /**
* Get {@link NodeId}.
* @return NodeId.
*/
@@ -117,6 +137,23 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
* @param other RemoteNode.
* @return Comparison.
*/
+
+ /**
+ * Get Node Partition.
+ * @return Node Partition.
+ */
+ @Private
+ @Unstable
+ public abstract String getNodePartition();
+
+ /**
+ * Set Node Partition.
+ * @param nodePartition
+ */
+ @Private
+ @Unstable
+ public abstract void setNodePartition(String nodePartition);
+
@Override
public int compareTo(RemoteNode other) {
return this.getNodeId().compareTo(other.getNodeId());
@@ -127,6 +164,7 @@ public abstract class RemoteNode implements Comparable<RemoteNode> {
return "RemoteNode{" +
"nodeId=" + getNodeId() + ", " +
"rackName=" + getRackName() + ", " +
- "httpAddress=" + getHttpAddress() + "}";
+ "httpAddress=" + getHttpAddress() + ", " +
+ "partition=" + getNodePartition() + "}";
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
index c2492cf..8fb4357 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RemoteNodePBImpl.java
@@ -137,6 +137,25 @@ public class RemoteNodePBImpl extends RemoteNode {
}
@Override
+ public String getNodePartition() {
+ RemoteNodeProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodePartition()) {
+ return null;
+ }
+ return (p.getNodePartition());
+ }
+
+ @Override
+ public void setNodePartition(String nodePartition) {
+ maybeInitBuilder();
+ if (nodePartition == null) {
+ builder.clearNodePartition();
+ return;
+ }
+ builder.setNodePartition(nodePartition);
+ }
+
+ @Override
public int hashCode() {
return getProto().hashCode();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
index 1f53648..ae1ba43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/scheduler/OpportunisticContainerAllocator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.scheduler;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -461,10 +462,17 @@ public class OpportunisticContainerAllocator {
private Collection<RemoteNode> findNodeCandidates(int loopIndex,
Map<String, RemoteNode> allNodes, Set<String> blackList,
EnrichedResourceRequest enrichedRR) {
+ LinkedList<RemoteNode> retList = new LinkedList<>();
+ String partition = getRequestPartition(enrichedRR);
if (loopIndex > 1) {
- return allNodes.values();
+ for (RemoteNode remoteNode : allNodes.values()) {
+ if (StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
+ retList.add(remoteNode);
+ }
+ }
+ return retList;
} else {
- LinkedList<RemoteNode> retList = new LinkedList<>();
+
int numContainers = enrichedRR.getRequest().getNumContainers();
while (numContainers > 0) {
if (loopIndex == 0) {
@@ -489,8 +497,10 @@ public class OpportunisticContainerAllocator {
private int collectRackLocalCandidates(Map<String, RemoteNode> allNodes,
EnrichedResourceRequest enrichedRR, LinkedList<RemoteNode> retList,
Set<String> blackList, int numContainers) {
+ String partition = getRequestPartition(enrichedRR);
for (RemoteNode rNode : allNodes.values()) {
- if (enrichedRR.getRackLocations().contains(rNode.getRackName())) {
+ if (StringUtils.equals(partition, getRemoteNodePartition(rNode)) &&
+ enrichedRR.getRackLocations().contains(rNode.getRackName())) {
if (blackList.contains(rNode.getNodeId().getHost())) {
retList.addLast(rNode);
} else {
@@ -508,9 +518,11 @@ public class OpportunisticContainerAllocator {
private int collectNodeLocalCandidates(Map<String, RemoteNode> allNodes,
EnrichedResourceRequest enrichedRR, List<RemoteNode> retList,
int numContainers) {
+ String partition = getRequestPartition(enrichedRR);
for (String nodeName : enrichedRR.getNodeLocations()) {
RemoteNode remoteNode = allNodes.get(nodeName);
- if (remoteNode != null) {
+ if (remoteNode != null &&
+ StringUtils.equals(partition, getRemoteNodePartition(remoteNode))) {
retList.add(remoteNode);
numContainers--;
}
@@ -563,7 +575,7 @@ public class OpportunisticContainerAllocator {
capability, currTime + tokenExpiry,
tokenSecretManager.getCurrentKey().getKeyId(), rmIdentifier,
schedulerKey.getPriority(), currTime,
- null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,
+ null, getRemoteNodePartition(node), ContainerType.TASK,
ExecutionType.OPPORTUNISTIC, schedulerKey.getAllocationRequestId());
byte[] pwd =
tokenSecretManager.createPassword(containerTokenIdentifier);
@@ -616,4 +628,20 @@ public class OpportunisticContainerAllocator {
}
return partitionedRequests;
}
+
+ private String getRequestPartition(EnrichedResourceRequest enrichedRR) {
+ String partition = enrichedRR.getRequest().getNodeLabelExpression();
+ if (partition == null) {
+ partition = CommonNodeLabelsManager.NO_LABEL;
+ }
+ return partition;
+ }
+
+ private String getRemoteNodePartition(RemoteNode node) {
+ String partition = node.getNodePartition();
+ if (partition == null) {
+ partition = CommonNodeLabelsManager.NO_LABEL;
+ }
+ return partition;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 1b090bf..387ddb4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -31,6 +31,7 @@ message RemoteNodeProto {
optional NodeIdProto node_id = 1;
optional string http_address = 2;
optional string rack_name = 3;
+ optional string node_partition = 4;
}
message RegisterDistributedSchedulingAMResponseProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
index 788b0b3..2d3b099 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
@@ -596,4 +596,41 @@ public class TestOpportunisticContainerAllocator {
}
Assert.assertEquals(100, containers.size());
}
+
+ @Test
+ public void testAllocationWithNodeLabels() throws Exception {
+ ResourceBlacklistRequest blacklistRequest =
+ ResourceBlacklistRequest.newInstance(
+ new ArrayList<>(), new ArrayList<>());
+ List<ResourceRequest> reqs =
+ Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+ "*", Resources.createResource(1 * GB), 1, true, "label",
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true)));
+ ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(0L, 1), 1);
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "h1:1234", "/r1")));
+ List<Container> containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+ /* Since there is no node satisfying node label constraints, requests
+ won't get fulfilled.
+ */
+ Assert.assertEquals(0, containers.size());
+ Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size());
+
+ oppCntxt.updateNodeList(
+ Arrays.asList(
+ RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "h1:1234", "/r1",
+ "label")));
+
+ containers = allocator.allocateContainers(
+ blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+ Assert.assertEquals(1, containers.size());
+ Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index ce425df..9b13627 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
@@ -174,6 +175,16 @@ public class OpportunisticContainerAllocatorAMService
appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes());
+ if (!partitionedAsks.getOpportunistic().isEmpty()) {
+ String appPartition = appAttempt.getAppAMNodePartitionName();
+
+ for (ResourceRequest req : partitionedAsks.getOpportunistic()) {
+ if (null == req.getNodeLabelExpression()) {
+ req.setNodeLabelExpression(appPartition);
+ }
+ }
+ }
+
List<Container> oppContainers =
oppContainerAllocator.allocateContainers(
request.getResourceBlacklistRequest(),
@@ -436,6 +447,7 @@ public class OpportunisticContainerAllocatorAMService
if (node != null) {
RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
rNode.setRackName(node.getRackName());
+ rNode.setNodePartition(node.getPartition());
return rNode;
}
return null;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a69239d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index efa76bc..5542157 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -927,6 +927,8 @@ public class TestOpportunisticContainerAllocatorAMService {
distAllReq.getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getNodeId().getHost());
+ Assert.assertEquals(
+ "l1", dsAllocResp.getNodesForScheduling().get(1).getNodePartition());
FinishApplicationMasterResponse dsfinishResp =
new FinishApplicationMasterResponsePBImpl(
@@ -1004,9 +1006,13 @@ public class TestOpportunisticContainerAllocatorAMService {
.getExecutionTypeRequest().getEnforceExecutionType());
DistributedSchedulingAllocateResponse resp = factory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
+ RemoteNode remoteNode1 = RemoteNode.newInstance(
+ NodeId.newInstance("h1", 1234), "http://h1:4321");
+ RemoteNode remoteNode2 = RemoteNode.newInstance(
+ NodeId.newInstance("h2", 1234), "http://h2:4321");
+ remoteNode2.setNodePartition("l1");
resp.setNodesForScheduling(
- Arrays.asList(RemoteNode.newInstance(
- NodeId.newInstance("h1", 1234), "http://h1:4321")));
+ Arrays.asList(remoteNode1, remoteNode2));
return resp;
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org