You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/03/30 21:05:25 UTC
[2/2] hadoop git commit: YARN-2495. Allow admin specify labels from
each NM (Distributed configuration for node label). (Naganarasimha G R via
wangda)
YARN-2495. Allow admin specify labels from each NM (Distributed configuration for node label). (Naganarasimha G R via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2a945d24
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2a945d24
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2a945d24
Branch: refs/heads/trunk
Commit: 2a945d24f7de1a7ae6e7bd6636188ce3b55c7f52
Parents: b804571
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Mar 30 12:04:51 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Mon Mar 30 12:05:21 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 12 +
.../src/main/proto/yarn_protos.proto | 4 +
.../yarn/client/TestResourceTrackerOnHA.java | 2 +-
.../protocolrecords/NodeHeartbeatRequest.java | 8 +-
.../protocolrecords/NodeHeartbeatResponse.java | 3 +
.../RegisterNodeManagerRequest.java | 12 +
.../RegisterNodeManagerResponse.java | 3 +
.../impl/pb/NodeHeartbeatRequestPBImpl.java | 37 ++
.../impl/pb/NodeHeartbeatResponsePBImpl.java | 13 +
.../pb/RegisterNodeManagerRequestPBImpl.java | 48 ++-
.../pb/RegisterNodeManagerResponsePBImpl.java | 13 +
.../yarn_server_common_service_protos.proto | 4 +
.../hadoop/yarn/TestYarnServerApiClasses.java | 94 ++++
.../yarn/server/nodemanager/NodeManager.java | 34 +-
.../nodemanager/NodeStatusUpdaterImpl.java | 114 ++++-
.../nodelabels/NodeLabelsProvider.java | 43 ++
.../nodemanager/TestNodeStatusUpdater.java | 2 +-
.../TestNodeStatusUpdaterForLabels.java | 281 ++++++++++++
.../resourcemanager/ResourceTrackerService.java | 80 +++-
.../TestResourceTrackerService.java | 430 ++++++++++++++++++-
21 files changed, 1199 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b38c9ac..f72d06d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -83,6 +83,9 @@ Release 2.8.0 - UNRELEASED
YARN-3288. Document and fix indentation in the DockerContainerExecutor code
+ YARN-2495. Allow admin specify labels from each NM (Distributed
+ configuration for node label). (Naganarasimha G R via wangda)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index a527af4..13e9a10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1719,6 +1719,18 @@ public class YarnConfiguration extends Configuration {
public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX
+ "enabled";
public static final boolean DEFAULT_NODE_LABELS_ENABLED = false;
+
+ public static final String NODELABEL_CONFIGURATION_TYPE =
+ NODE_LABELS_PREFIX + "configuration-type";
+
+ public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
+ "centralized";
+
+ public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE =
+ "distributed";
+
+ public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
+ CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
public YarnConfiguration() {
super();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 194be82..b396f4d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -239,6 +239,10 @@ message NodeIdToLabelsProto {
repeated string nodeLabels = 2;
}
+message StringArrayProto {
+ repeated string elements = 1;
+}
+
message LabelsToNodeIdsProto {
optional string nodeLabels = 1;
repeated NodeIdProto nodeId = 2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
index 8885769..8167a58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
@@ -70,7 +70,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
null, null);
NodeHeartbeatRequest request2 =
- NodeHeartbeatRequest.newInstance(status, null, null);
+ NodeHeartbeatRequest.newInstance(status, null, null,null);
resourceTracker.nodeHeartbeat(request2);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index addd3fe..b80d9ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
+import java.util.Set;
+
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
@@ -26,7 +28,7 @@ public abstract class NodeHeartbeatRequest {
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey,
- MasterKey lastKnownNMTokenMasterKey) {
+ MasterKey lastKnownNMTokenMasterKey, Set<String> nodeLabels) {
NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
@@ -34,6 +36,7 @@ public abstract class NodeHeartbeatRequest {
.setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
nodeHeartbeatRequest
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
+ nodeHeartbeatRequest.setNodeLabels(nodeLabels);
return nodeHeartbeatRequest;
}
@@ -45,4 +48,7 @@ public abstract class NodeHeartbeatRequest {
public abstract MasterKey getLastKnownNMTokenMasterKey();
public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
+
+ public abstract Set<String> getNodeLabels();
+ public abstract void setNodeLabels(Set<String> nodeLabels);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 9fb44ca..1498a0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -67,4 +67,7 @@ public interface NodeHeartbeatResponse {
void setSystemCredentialsForApps(
Map<ApplicationId, ByteBuffer> systemCredentials);
+
+ boolean getAreNodeLabelsAcceptedByRM();
+ void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
index 366c32c..bf09b33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -31,6 +32,14 @@ public abstract class RegisterNodeManagerRequest {
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications) {
+ return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
+ containerStatuses, runningApplications, null);
+ }
+
+ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
+ int httpPort, Resource resource, String nodeManagerVersionId,
+ List<NMContainerStatus> containerStatuses,
+ List<ApplicationId> runningApplications, Set<String> nodeLabels) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
@@ -39,6 +48,7 @@ public abstract class RegisterNodeManagerRequest {
request.setNMVersion(nodeManagerVersionId);
request.setContainerStatuses(containerStatuses);
request.setRunningApplications(runningApplications);
+ request.setNodeLabels(nodeLabels);
return request;
}
@@ -47,6 +57,8 @@ public abstract class RegisterNodeManagerRequest {
public abstract Resource getResource();
public abstract String getNMVersion();
public abstract List<NMContainerStatus> getNMContainerStatuses();
+ public abstract Set<String> getNodeLabels();
+ public abstract void setNodeLabels(Set<String> nodeLabels);
/**
* We introduce this here because currently YARN RM doesn't persist nodes info
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
index b20803f..c8678f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
@@ -45,4 +45,7 @@ public interface RegisterNodeManagerResponse {
void setRMVersion(String version);
String getRMVersion();
+
+ boolean getAreNodeLabelsAcceptedByRM();
+ void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 26d1f19..16d47f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
@@ -36,6 +41,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private NodeStatus nodeStatus = null;
private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null;
+ private Set<String> labels = null;
public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder();
@@ -80,6 +86,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
builder.setLastKnownNmTokenMasterKey(
convertToProtoFormat(this.lastKnownNMTokenMasterKey));
}
+ if (this.labels != null) {
+ builder.clearNodeLabels();
+ builder.setNodeLabels(StringArrayProto.newBuilder()
+ .addAllElements(this.labels).build());
+ }
}
private void mergeLocalToProto() {
@@ -178,4 +189,30 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl)t).getProto();
}
+
+ @Override
+ public Set<String> getNodeLabels() {
+ initNodeLabels();
+ return this.labels;
+ }
+
+ @Override
+ public void setNodeLabels(Set<String> nodeLabels) {
+ maybeInitBuilder();
+ builder.clearNodeLabels();
+ this.labels = nodeLabels;
+ }
+
+ private void initNodeLabels() {
+ if (this.labels != null) {
+ return;
+ }
+ NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodeLabels()) {
+ labels = null;
+ return;
+ }
+ StringArrayProto nodeLabels = p.getNodeLabels();
+ labels = new HashSet<String>(nodeLabels.getElementsList());
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 630a5bf..e27d8ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -483,5 +483,18 @@ public class NodeHeartbeatResponsePBImpl extends
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl) t).getProto();
}
+
+ @Override
+ public boolean getAreNodeLabelsAcceptedByRM() {
+ NodeHeartbeatResponseProtoOrBuilder p =
+ this.viaProto ? this.proto : this.builder;
+ return p.getAreNodeLabelsAcceptedByRM();
+ }
+
+ @Override
+ public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
+ maybeInitBuilder();
+ this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
index ce4faec..1d2bb82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
@@ -20,32 +20,27 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-
-
public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
@@ -56,7 +51,8 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private NodeId nodeId = null;
private List<NMContainerStatus> containerStatuses = null;
private List<ApplicationId> runningApplications = null;
-
+ private Set<String> labels = null;
+
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
}
@@ -86,7 +82,11 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
if (this.nodeId != null) {
builder.setNodeId(convertToProtoFormat(this.nodeId));
}
-
+ if (this.labels != null) {
+ builder.clearNodeLabels();
+ builder.setNodeLabels(StringArrayProto.newBuilder()
+ .addAllElements(this.labels).build());
+ }
}
private synchronized void addNMContainerStatusesToProto() {
@@ -292,6 +292,32 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
builder.setNmVersion(version);
}
+ @Override
+ public Set<String> getNodeLabels() {
+ initNodeLabels();
+ return this.labels;
+ }
+
+ @Override
+ public void setNodeLabels(Set<String> nodeLabels) {
+ maybeInitBuilder();
+ builder.clearNodeLabels();
+ this.labels = nodeLabels;
+ }
+
+ private void initNodeLabels() {
+ if (this.labels != null) {
+ return;
+ }
+ RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasNodeLabels()) {
+ labels=null;
+ return;
+ }
+ StringArrayProto nodeLabels = p.getNodeLabels();
+ labels = new HashSet<String>(nodeLabels.getElementsList());
+ }
+
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
index ac329ed..391d00d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
@@ -216,4 +216,17 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl)t).getProto();
}
+
+ @Override
+ public boolean getAreNodeLabelsAcceptedByRM() {
+ RegisterNodeManagerResponseProtoOrBuilder p =
+ this.viaProto ? this.proto : this.builder;
+ return p.getAreNodeLabelsAcceptedByRM();
+ }
+
+ @Override
+ public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
+ maybeInitBuilder();
+ this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 91473c5..d8c92c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto {
optional string nm_version = 5;
repeated NMContainerStatusProto container_statuses = 6;
repeated ApplicationIdProto runningApplications = 7;
+ optional StringArrayProto nodeLabels = 8;
}
message RegisterNodeManagerResponseProto {
@@ -41,12 +42,14 @@ message RegisterNodeManagerResponseProto {
optional int64 rm_identifier = 4;
optional string diagnostics_message = 5;
optional string rm_version = 6;
+ optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
}
message NodeHeartbeatRequestProto {
optional NodeStatusProto node_status = 1;
optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3;
+ optional StringArrayProto nodeLabels = 4;
}
message NodeHeartbeatResponseProto {
@@ -60,6 +63,7 @@ message NodeHeartbeatResponseProto {
optional string diagnostics_message = 8;
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
+ optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
}
message SystemCredentialsForAppsProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index 20983b6..d42b2c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -19,11 +19,13 @@
package org.apache.hadoop.yarn;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
@@ -46,6 +49,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
+import org.junit.Assert;
import org.junit.Test;
/**
@@ -77,7 +81,17 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
assertEquals(NodeAction.NORMAL, copy.getNodeAction());
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
+ assertFalse(copy.getAreNodeLabelsAcceptedByRM());
+ }
+ @Test
+ public void testRegisterNodeManagerResponsePBImplWithRMAcceptLbls() {
+ RegisterNodeManagerResponsePBImpl original =
+ new RegisterNodeManagerResponsePBImpl();
+ original.setAreNodeLabelsAcceptedByRM(true);
+ RegisterNodeManagerResponsePBImpl copy =
+ new RegisterNodeManagerResponsePBImpl(original.getProto());
+ assertTrue(copy.getAreNodeLabelsAcceptedByRM());
}
/**
@@ -89,11 +103,32 @@ public class TestYarnServerApiClasses {
original.setLastKnownContainerTokenMasterKey(getMasterKey());
original.setLastKnownNMTokenMasterKey(getMasterKey());
original.setNodeStatus(getNodeStatus());
+ original.setNodeLabels(getValidNodeLabels());
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
original.getProto());
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
+ // check labels are coming with valid values
+ Assert.assertTrue(original.getNodeLabels()
+ .containsAll(copy.getNodeLabels()));
+ // check for empty labels
+ original.setNodeLabels(new HashSet<String> ());
+ copy = new NodeHeartbeatRequestPBImpl(
+ original.getProto());
+ Assert.assertNotNull(copy.getNodeLabels());
+ Assert.assertEquals(0, copy.getNodeLabels().size());
+ }
+
+ /**
+ * Test NodeHeartbeatRequestPBImpl.
+ */
+ @Test
+ public void testNodeHeartbeatRequestPBImplWithNullLabels() {
+ NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl();
+ NodeHeartbeatRequestPBImpl copy =
+ new NodeHeartbeatRequestPBImpl(original.getProto());
+ Assert.assertNull(copy.getNodeLabels());
}
/**
@@ -119,6 +154,16 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
+ assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
+ }
+
+ @Test
+ public void testNodeHeartbeatResponsePBImplWithRMAcceptLbls() {
+ NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
+ original.setAreNodeLabelsAcceptedByRM(true);
+ NodeHeartbeatResponsePBImpl copy =
+ new NodeHeartbeatResponsePBImpl(original.getProto());
+ assertTrue(copy.getAreNodeLabelsAcceptedByRM());
}
/**
@@ -208,6 +253,55 @@ public class TestYarnServerApiClasses {
}
+ @Test
+ public void testRegisterNodeManagerRequestWithNullLabels() {
+ RegisterNodeManagerRequest request =
+ RegisterNodeManagerRequest.newInstance(
+ NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
+ "version", null, null);
+
+ // serialze to proto, and get request from proto
+ RegisterNodeManagerRequest request1 =
+ new RegisterNodeManagerRequestPBImpl(
+ ((RegisterNodeManagerRequestPBImpl) request).getProto());
+
+ // check labels are coming with no values
+ Assert.assertNull(request1.getNodeLabels());
+ }
+
+ @Test
+ public void testRegisterNodeManagerRequestWithValidLabels() {
+ HashSet<String> nodeLabels = getValidNodeLabels();
+ RegisterNodeManagerRequest request =
+ RegisterNodeManagerRequest.newInstance(
+ NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
+ "version", null, null, nodeLabels);
+
+ // serialze to proto, and get request from proto
+ RegisterNodeManagerRequest copy =
+ new RegisterNodeManagerRequestPBImpl(
+ ((RegisterNodeManagerRequestPBImpl) request).getProto());
+
+ // check labels are coming with valid values
+ Assert.assertEquals(true, nodeLabels.containsAll(copy.getNodeLabels()));
+
+ // check for empty labels
+ request.setNodeLabels(new HashSet<String> ());
+ copy = new RegisterNodeManagerRequestPBImpl(
+ ((RegisterNodeManagerRequestPBImpl) request).getProto());
+ Assert.assertNotNull(copy.getNodeLabels());
+ Assert.assertEquals(0, copy.getNodeLabels().size());
+ }
+
+ private HashSet<String> getValidNodeLabels() {
+ HashSet<String> nodeLabels = new HashSet<String>();
+ nodeLabels.add("java");
+ nodeLabels.add("windows");
+ nodeLabels.add("gpu");
+ nodeLabels.add("x86");
+ return nodeLabels;
+ }
+
private ContainerStatus getContainerStatus(int applicationId,
int containerID, int appAttemptId) {
ContainerStatus status = recordFactory
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index a4be120..f95a7ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -80,6 +81,7 @@ public class NodeManager extends CompositeService
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker;
+ private NodeLabelsProvider nodeLabelsProvider;
private LocalDirsHandlerService dirsHandler;
private Context context;
private AsyncDispatcher dispatcher;
@@ -98,7 +100,22 @@ public class NodeManager extends CompositeService
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
- metrics);
+ metrics, nodeLabelsProvider);
+ }
+
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ NodeLabelsProvider nodeLabelsProvider) {
+ return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
+ metrics, nodeLabelsProvider);
+ }
+
+ @VisibleForTesting
+ protected NodeLabelsProvider createNodeLabelsProvider(
+ Configuration conf) throws IOException {
+ // TODO as part of YARN-2729
+ // Need to get the implementation of provider service and return
+ return null;
}
protected NodeResourceMonitor createNodeResourceMonitor() {
@@ -245,9 +262,18 @@ public class NodeManager extends CompositeService
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager, nmStore);
-
- nodeStatusUpdater =
- createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
+
+ nodeLabelsProvider = createNodeLabelsProvider(conf);
+
+ if (null == nodeLabelsProvider) {
+ nodeStatusUpdater =
+ createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
+ } else {
+ addService(nodeLabelsProvider);
+ nodeStatusUpdater =
+ createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
+ nodeLabelsProvider);
+ }
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 6ddd7e4..2549e0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
@@ -120,15 +123,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
+ private final NodeLabelsProvider nodeLabelsProvider;
+ private final boolean hasNodeLabelsProvider;
+
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ this(context, dispatcher, healthChecker, metrics, null);
+ }
+
+ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
+ NodeLabelsProvider nodeLabelsProvider) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
+ this.nodeLabelsProvider = nodeLabelsProvider;
+ this.hasNodeLabelsProvider = (nodeLabelsProvider != null);
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
- this.recentlyStoppedContainers =
- new LinkedHashMap<ContainerId, Long>();
+ this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
this.pendingCompletedContainers =
new HashMap<ContainerId, ContainerStatus>();
}
@@ -253,22 +266,30 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
protected void registerWithRM()
throws YarnException, IOException {
List<NMContainerStatus> containerReports = getNMContainerStatuses();
+ Set<String> nodeLabels = null;
+ if (hasNodeLabelsProvider) {
+ nodeLabels = nodeLabelsProvider.getNodeLabels();
+ nodeLabels =
+ (null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_STRING_SET
+ : nodeLabels;
+ }
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
- nodeManagerVersionId, containerReports, getRunningApplications());
+ nodeManagerVersionId, containerReports, getRunningApplications(),
+ nodeLabels);
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();
- // if the Resourcemanager instructs NM to shutdown.
+ // if the Resource Manager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message =
"Message from ResourceManager: "
+ regNMResponse.getDiagnosticsMessage();
throw new YarnRuntimeException(
- "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, "
+ "Recieved SHUTDOWN signal from Resourcemanager, Registration of NodeManager failed, "
+ message);
}
@@ -306,8 +327,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.context.getNMTokenSecretManager().setMasterKey(masterKey);
}
- LOG.info("Registered with ResourceManager as " + this.nodeId
- + " with total resource of " + this.totalResource);
+ StringBuilder successfullRegistrationMsg = new StringBuilder();
+ successfullRegistrationMsg.append("Registered with ResourceManager as ")
+ .append(this.nodeId).append(" with total resource of ")
+ .append(this.totalResource);
+
+ if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
+ successfullRegistrationMsg
+ .append(" and with following Node label(s) : {")
+ .append(StringUtils.join(",", nodeLabels)).append("}");
+ } else if (hasNodeLabelsProvider) {
+ //case where provider is set but RM did not accept the Node Labels
+ LOG.error(regNMResponse.getDiagnosticsMessage());
+ }
+
+ LOG.info(successfullRegistrationMsg);
LOG.info("Notifying ContainerManager to unblock new container-requests");
((ContainerManagerImpl) this.context.getContainerManager())
.setBlockNewContainerRequests(false);
@@ -580,19 +614,41 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override
@SuppressWarnings("unchecked")
public void run() {
- int lastHeartBeatID = 0;
+ int lastHeartbeatID = 0;
+ Set<String> lastUpdatedNodeLabelsToRM = null;
+ if (hasNodeLabelsProvider) {
+ lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels();
+ lastUpdatedNodeLabelsToRM =
+ (null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_STRING_SET
+ : lastUpdatedNodeLabelsToRM;
+ }
while (!isStopped) {
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
- NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
-
+ Set<String> nodeLabelsForHeartbeat = null;
+ NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
+
+ if (hasNodeLabelsProvider) {
+ nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels();
+ //if the provider returns null then consider empty labels are set
+ nodeLabelsForHeartbeat =
+ (nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_STRING_SET
+ : nodeLabelsForHeartbeat;
+ if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat,
+ lastUpdatedNodeLabelsToRM)) {
+ //if nodelabels have not changed then no need to send
+ nodeLabelsForHeartbeat = null;
+ }
+ }
+
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
- NodeStatusUpdaterImpl.this.context
- .getContainerTokenSecretManager().getCurrentKey(),
- NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
- .getCurrentKey());
+ NodeStatusUpdaterImpl.this.context
+ .getContainerTokenSecretManager().getCurrentKey(),
+ NodeStatusUpdaterImpl.this.context
+ .getNMTokenSecretManager().getCurrentKey(),
+ nodeLabelsForHeartbeat);
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
@@ -623,6 +679,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
break;
}
+ if (response.getAreNodeLabelsAcceptedByRM()) {
+ lastUpdatedNodeLabelsToRM = nodeLabelsForHeartbeat;
+ LOG.info("Node Labels {"
+ + StringUtils.join(",", nodeLabelsForHeartbeat)
+ + "} were Accepted by RM ");
+ } else if (nodeLabelsForHeartbeat != null) {
+ // case where NodeLabelsProvider is set and updated labels were
+ // sent to RM and RM rejected the labels
+ LOG.error(response.getDiagnosticsMessage());
+ }
+
// Explicitly put this method after checking the resync response. We
// don't want to remove the completed containers before resync
// because these completed containers will be reported back to RM
@@ -631,7 +698,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());
- lastHeartBeatID = response.getResponseId();
+ lastHeartbeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
if (!containersToCleanup.isEmpty()) {
@@ -680,6 +747,23 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
+ /**
+ * Caller should take care of sending non null nodelabels for both
+ * arguments
+ *
+ * @param nodeLabelsNew
+ * @param nodeLabelsOld
+ * @return if the New node labels are diff from the older one.
+ */
+ private boolean areNodeLabelsUpdated(Set<String> nodeLabelsNew,
+ Set<String> nodeLabelsOld) {
+ if (nodeLabelsNew.size() != nodeLabelsOld.size()
+ || !nodeLabelsOld.containsAll(nodeLabelsNew)) {
+ return true;
+ }
+ return false;
+ }
+
private void updateMasterKeys(NodeHeartbeatResponse response) {
// See if the master-key has rolled over
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java
new file mode 100644
index 0000000..4b34d76
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/nodelabels/NodeLabelsProvider.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
+
+import java.util.Set;
+
+import org.apache.hadoop.service.AbstractService;
+
+/**
+ * Interface which will be responsible for fetching the labels
+ *
+ */
+public abstract class NodeLabelsProvider extends AbstractService {
+
+ public NodeLabelsProvider(String name) {
+ super(name);
+ }
+
+ /**
+ * Provides the labels. LabelProvider is expected to give same Labels
+ * continuously until there is a change in labels.
+ * If null is returned then Empty label set is assumed by the caller.
+ *
+ * @return Set of node label strings applicable for a node
+ */
+ public abstract Set<String> getNodeLabels();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 71a420e..fc404de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -1182,7 +1182,7 @@ public class TestNodeStatusUpdater {
}
};
verifyNodeStartFailure(
- "Recieved SHUTDOWN signal from Resourcemanager ,"
+ "Recieved SHUTDOWN signal from Resourcemanager, "
+ "Registration of NodeManager failed, "
+ "Message from ResourceManager: RM Shutting Down Node");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
new file mode 100644
index 0000000..437e4c8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdaterForLabels.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
+import org.apache.hadoop.yarn.server.api.ResourceTracker;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
+import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
+ private static final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
+
+ private NodeManager nm;
+ protected DummyNodeLabelsProvider dummyLabelsProviderRef;
+
+ @Before
+ public void setup() {
+ dummyLabelsProviderRef = new DummyNodeLabelsProvider();
+ }
+
+ @After
+ public void tearDown() {
+ if (null != nm) {
+ ServiceOperations.stop(nm);
+ }
+ }
+
+ private class ResourceTrackerForLabels implements ResourceTracker {
+ int heartbeatID = 0;
+ Set<String> labels;
+
+ private boolean receivedNMHeartbeat = false;
+ private boolean receivedNMRegister = false;
+
+ private MasterKey createMasterKey() {
+ MasterKey masterKey = new MasterKeyPBImpl();
+ masterKey.setKeyId(123);
+ masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
+ .byteValue() }));
+ return masterKey;
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnException, IOException {
+ labels = request.getNodeLabels();
+ RegisterNodeManagerResponse response =
+ recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+ response.setNodeAction(NodeAction.NORMAL);
+ response.setContainerTokenMasterKey(createMasterKey());
+ response.setNMTokenMasterKey(createMasterKey());
+ response.setAreNodeLabelsAcceptedByRM(labels != null);
+ synchronized (ResourceTrackerForLabels.class) {
+ receivedNMRegister = true;
+ ResourceTrackerForLabels.class.notifyAll();
+ }
+ return response;
+ }
+
+ public void waitTillHeartbeat() {
+ if (receivedNMHeartbeat) {
+ return;
+ }
+ int i = 500;
+ while (!receivedNMHeartbeat && i > 0) {
+ synchronized (ResourceTrackerForLabels.class) {
+ if (!receivedNMHeartbeat) {
+ try {
+ System.out
+ .println("In ResourceTrackerForLabels waiting for heartbeat : "
+ + System.currentTimeMillis());
+ ResourceTrackerForLabels.class.wait(500l);
+ // to avoid race condition, i.e. sendOutofBandHeartBeat can be
+ // sent before NSU thread has gone to sleep, hence we wait and try
+ // to resend heartbeat again
+ nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+ ResourceTrackerForLabels.class.wait(500l);
+ i--;
+ } catch (InterruptedException e) {
+ Assert.fail("Exception caught while waiting for Heartbeat");
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ if (!receivedNMHeartbeat) {
+ Assert.fail("Heartbeat dint receive even after waiting");
+ }
+ }
+
+ public void waitTillRegister() {
+ if (receivedNMRegister) {
+ return;
+ }
+ while (!receivedNMRegister) {
+ synchronized (ResourceTrackerForLabels.class) {
+ try {
+ ResourceTrackerForLabels.class.wait();
+ } catch (InterruptedException e) {
+ Assert.fail("Exception caught while waiting for register");
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ /**
+ * Flag to indicate received any
+ */
+ public void resetNMHeartbeatReceiveFlag() {
+ synchronized (ResourceTrackerForLabels.class) {
+ receivedNMHeartbeat = false;
+ }
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnException, IOException {
+ System.out.println("RTS receive heartbeat : "
+ + System.currentTimeMillis());
+ labels = request.getNodeLabels();
+ NodeStatus nodeStatus = request.getNodeStatus();
+ nodeStatus.setResponseId(heartbeatID++);
+
+ NodeHeartbeatResponse nhResponse =
+ YarnServerBuilderUtils.newNodeHeartbeatResponse(heartbeatID,
+ NodeAction.NORMAL, null, null, null, null, 1000L);
+
+ // to ensure that heartbeats are sent only when required.
+ nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE);
+ nhResponse.setAreNodeLabelsAcceptedByRM(labels != null);
+
+ synchronized (ResourceTrackerForLabels.class) {
+ receivedNMHeartbeat = true;
+ ResourceTrackerForLabels.class.notifyAll();
+ }
+ return nhResponse;
+ }
+ }
+
+ public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
+
+ @SuppressWarnings("unchecked")
+ private Set<String> nodeLabels = Collections.EMPTY_SET;
+
+ public DummyNodeLabelsProvider() {
+ super(DummyNodeLabelsProvider.class.getName());
+ }
+
+ @Override
+ public synchronized Set<String> getNodeLabels() {
+ return nodeLabels;
+ }
+
+ synchronized void setNodeLabels(Set<String> nodeLabels) {
+ this.nodeLabels = nodeLabels;
+ }
+ }
+
+ private YarnConfiguration createNMConfigForDistributeNodeLabels() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
+ YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
+ return conf;
+ }
+
+ @Test
+ public void testNodeStatusUpdaterForNodeLabels() throws InterruptedException,
+ IOException {
+ final ResourceTrackerForLabels resourceTracker =
+ new ResourceTrackerForLabels();
+ nm = new NodeManager() {
+ @Override
+ protected NodeLabelsProvider createNodeLabelsProvider(
+ Configuration conf) throws IOException {
+ return dummyLabelsProviderRef;
+ }
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ NodeLabelsProvider labelsProvider) {
+
+ return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
+ metrics, labelsProvider) {
+ @Override
+ protected ResourceTracker getRMClient() {
+ return resourceTracker;
+ }
+
+ @Override
+ protected void stopRMProxy() {
+ return;
+ }
+ };
+ }
+ };
+
+ YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
+ nm.init(conf);
+ resourceTracker.resetNMHeartbeatReceiveFlag();
+ nm.start();
+ resourceTracker.waitTillRegister();
+ assertCollectionEquals(resourceTracker.labels,
+ dummyLabelsProviderRef.getNodeLabels());
+
+ resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
+ resourceTracker.resetNMHeartbeatReceiveFlag();
+
+ // heartbeat with updated labels
+ dummyLabelsProviderRef.setNodeLabels(toSet("P"));
+
+ nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+ resourceTracker.waitTillHeartbeat();
+ assertCollectionEquals(resourceTracker.labels,
+ dummyLabelsProviderRef.getNodeLabels());
+ resourceTracker.resetNMHeartbeatReceiveFlag();
+
+ // heartbeat without updating labels
+ nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+ resourceTracker.waitTillHeartbeat();
+ resourceTracker.resetNMHeartbeatReceiveFlag();
+ assertNull(
+ "If no change in labels then null should be sent as part of request",
+ resourceTracker.labels);
+
+ // provider return with null labels
+ dummyLabelsProviderRef.setNodeLabels(null);
+ nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
+ resourceTracker.waitTillHeartbeat();
+ assertTrue("If provider sends null then empty labels should be sent",
+ resourceTracker.labels.isEmpty());
+ resourceTracker.resetNMHeartbeatReceiveFlag();
+
+ nm.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2a945d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 0de556b..22efe25 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
@@ -31,6 +34,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -100,6 +104,8 @@ public class ResourceTrackerService extends AbstractService implements
private int minAllocMb;
private int minAllocVcores;
+ private boolean isDistributesNodeLabelsConf;
+
static {
resync.setNodeAction(NodeAction.RESYNC);
@@ -149,6 +155,14 @@ public class ResourceTrackerService extends AbstractService implements
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
+ String nodeLabelConfigurationType =
+ conf.get(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
+ YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);
+
+ isDistributesNodeLabelsConf =
+ YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE
+ .equals(nodeLabelConfigurationType);
+
super.serviceInit(conf);
}
@@ -336,11 +350,31 @@ public class ResourceTrackerService extends AbstractService implements
}
}
- String message =
- "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
- + httpPort + ") " + "registered with capability: " + capability
- + ", assigned nodeId " + nodeId;
- LOG.info(message);
+ // Update node's labels to RM's NodeLabelManager.
+ Set<String> nodeLabels = request.getNodeLabels();
+ if (isDistributesNodeLabelsConf && nodeLabels != null) {
+ try {
+ updateNodeLabelsFromNMReport(nodeLabels, nodeId);
+ response.setAreNodeLabelsAcceptedByRM(true);
+ } catch (IOException ex) {
+ // Ensure the exception is captured in the response
+ response.setDiagnosticsMessage(ex.getMessage());
+ response.setAreNodeLabelsAcceptedByRM(false);
+ }
+ }
+
+ StringBuilder message = new StringBuilder();
+ message.append("NodeManager from node ").append(host).append("(cmPort: ")
+ .append(cmPort).append(" httpPort: ");
+ message.append(httpPort).append(") ")
+ .append("registered with capability: ").append(capability);
+ message.append(", assigned nodeId ").append(nodeId);
+ if (response.getAreNodeLabelsAcceptedByRM()) {
+ message.append(", node labels { ").append(
+ StringUtils.join(",", nodeLabels) + " } ");
+ }
+
+ LOG.info(message.toString());
response.setNodeAction(NodeAction.NORMAL);
response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
response.setRMVersion(YarnVersionInfo.getVersion());
@@ -359,6 +393,7 @@ public class ResourceTrackerService extends AbstractService implements
* 2. Check if it's a registered node
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
* 4. Send healthStatus to RMNode
+ * 5. Update node's labels if distributed Node Labels configuration is enabled
*/
NodeId nodeId = remoteNodeStatus.getNodeId();
@@ -428,9 +463,44 @@ public class ResourceTrackerService extends AbstractService implements
remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
+ // 5. Update node's labels to RM's NodeLabelManager.
+ if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) {
+ try {
+ updateNodeLabelsFromNMReport(request.getNodeLabels(), nodeId);
+ nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
+ } catch (IOException ex) {
+ //ensure the error message is captured and sent across in response
+ nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage());
+ nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(false);
+ }
+ }
+
return nodeHeartBeatResponse;
}
+ private void updateNodeLabelsFromNMReport(Set<String> nodeLabels,
+ NodeId nodeId) throws IOException {
+ try {
+ Map<NodeId, Set<String>> labelsUpdate =
+ new HashMap<NodeId, Set<String>>();
+ labelsUpdate.put(nodeId, nodeLabels);
+ this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node Labels {" + StringUtils.join(",", nodeLabels)
+ + "} from Node " + nodeId + " were Accepted from RM");
+ }
+ } catch (IOException ex) {
+ StringBuilder errorMessage = new StringBuilder();
+ errorMessage.append("Node Labels {")
+ .append(StringUtils.join(",", nodeLabels))
+ .append("} reported from NM with ID ").append(nodeId)
+ .append(" was rejected from RM with exception message as : ")
+ .append(ex.getMessage());
+ LOG.error(errorMessage, ex);
+ throw new IOException(errorMessage.toString(), ex);
+ }
+ }
+
private void populateKeys(NodeHeartbeatRequest request,
NodeHeartbeatResponse nodeHeartBeatResponse) {