You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/18 00:36:02 UTC
[34/50] [abbrv] hadoop git commit: YARN-3354. Add node label
expression in ContainerTokenIdentifier to support RM recovery. Contributed by
Wangda Tan
YARN-3354. Add node label expression in ContainerTokenIdentifier to support RM recovery. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/25366bc7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/25366bc7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/25366bc7
Branch: refs/heads/YARN-2928
Commit: 25366bc7847899971f170e110ae5a7c83982d4e1
Parents: 1aa894e
Author: Jian He <ji...@apache.org>
Authored: Wed Apr 15 13:57:06 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Fri Apr 17 15:29:45 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../yarn/security/ContainerTokenIdentifier.java | 21 +-
.../main/proto/server/yarn_security_token.proto | 1 +
.../api/protocolrecords/NMContainerStatus.java | 22 +-
.../impl/pb/NMContainerStatusPBImpl.java | 21 +-
.../yarn_server_common_service_protos.proto | 1 +
.../container/ContainerImpl.java | 7 +-
.../containermanager/TestContainerManager.java | 2 +-
.../rmcontainer/RMContainer.java | 2 +
.../rmcontainer/RMContainerImpl.java | 26 +-
.../scheduler/AbstractYarnScheduler.java | 2 +-
.../scheduler/SchedulerApplicationAttempt.java | 13 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 7 +-
.../security/RMContainerTokenSecretManager.java | 7 +-
.../server/resourcemanager/TestRMRestart.java | 13 +-
.../capacity/TestContainerAllocation.java | 5 +-
...TestWorkPreservingRMRestartForNodeLabel.java | 282 +++++++++++++++++++
17 files changed, 408 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8b08f98..1a58988 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -140,6 +140,9 @@ Release 2.8.0 - UNRELEASED
YARN-3326. Support RESTful API for getLabelsToNodes. (Naganarasimha G R
via ozawa)
+ YARN-3354. Add node label expression in ContainerTokenIdentifier to support
+ RM recovery. (Wangda Tan via jianhe)
+
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
index 593bfc3..9a60d01 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerTokenIdentifier.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import com.google.protobuf.TextFormat;
@@ -64,13 +65,14 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
String hostName, String appSubmitter, Resource r, long expiryTimeStamp,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime) {
this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
- rmIdentifier, priority, creationTime, null);
+ rmIdentifier, priority, creationTime, null,
+ CommonNodeLabelsManager.NO_LABEL);
}
public ContainerTokenIdentifier(ContainerId containerID, String hostName,
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
long rmIdentifier, Priority priority, long creationTime,
- LogAggregationContext logAggregationContext) {
+ LogAggregationContext logAggregationContext, String nodeLabelExpression) {
ContainerTokenIdentifierProto.Builder builder =
ContainerTokenIdentifierProto.newBuilder();
if (containerID != null) {
@@ -93,6 +95,11 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
builder.setLogAggregationContext(
((LogAggregationContextPBImpl)logAggregationContext).getProto());
}
+
+ if (nodeLabelExpression != null) {
+ builder.setNodeLabelExpression(nodeLabelExpression);
+ }
+
proto = builder.build();
}
@@ -186,6 +193,16 @@ public class ContainerTokenIdentifier extends TokenIdentifier {
return UserGroupInformation.createRemoteUser(
containerId);
}
+
+ /**
+ * Get the node-label-expression in the original ResourceRequest
+ */
+ public String getNodeLabelExpression() {
+ if (proto.hasNodeLabelExpression()) {
+ return proto.getNodeLabelExpression();
+ }
+ return CommonNodeLabelsManager.NO_LABEL;
+ }
// TODO: Needed?
@InterfaceAudience.Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto
index 317032d..d1bef21 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/proto/server/yarn_security_token.proto
@@ -49,6 +49,7 @@ message ContainerTokenIdentifierProto {
optional PriorityProto priority = 8;
optional int64 creationTime = 9;
optional LogAggregationContextProto logAggregationContext = 10;
+ optional string nodeLabelExpression = 11;
}
message ClientToAMTokenIdentifierProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java
index 2f8f92d..4067c11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NMContainerStatus.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.util.Records;
/**
@@ -31,11 +32,21 @@ import org.apache.hadoop.yarn.util.Records;
* inside YARN and by end-users.
*/
public abstract class NMContainerStatus {
-
+
+ // Used by tests only
public static NMContainerStatus newInstance(ContainerId containerId,
ContainerState containerState, Resource allocatedResource,
String diagnostics, int containerExitStatus, Priority priority,
long creationTime) {
+ return newInstance(containerId, containerState, allocatedResource,
+ diagnostics, containerExitStatus, priority, creationTime,
+ CommonNodeLabelsManager.NO_LABEL);
+ }
+
+ public static NMContainerStatus newInstance(ContainerId containerId,
+ ContainerState containerState, Resource allocatedResource,
+ String diagnostics, int containerExitStatus, Priority priority,
+ long creationTime, String nodeLabelExpression) {
NMContainerStatus status =
Records.newRecord(NMContainerStatus.class);
status.setContainerId(containerId);
@@ -45,6 +56,7 @@ public abstract class NMContainerStatus {
status.setContainerExitStatus(containerExitStatus);
status.setPriority(priority);
status.setCreationTime(creationTime);
+ status.setNodeLabelExpression(nodeLabelExpression);
return status;
}
@@ -105,4 +117,12 @@ public abstract class NMContainerStatus {
public abstract long getCreationTime();
public abstract void setCreationTime(long creationTime);
+
+ /**
+ * Get the node-label-expression in the original ResourceRequest
+ */
+ public abstract String getNodeLabelExpression();
+
+ public abstract void setNodeLabelExpression(
+ String nodeLabelExpression);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java
index 86e1d97..624b89b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NMContainerStatusPBImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
@@ -207,6 +208,25 @@ public class NMContainerStatusPBImpl extends NMContainerStatus {
maybeInitBuilder();
builder.setCreationTime(creationTime);
}
+
+ @Override
+ public String getNodeLabelExpression() {
+ NMContainerStatusProtoOrBuilder p = viaProto ? proto : builder;
+ if (p.hasNodeLabelExpression()) {
+ return p.getNodeLabelExpression();
+ }
+ return CommonNodeLabelsManager.NO_LABEL;
+ }
+
+ @Override
+ public void setNodeLabelExpression(String nodeLabelExpression) {
+ maybeInitBuilder();
+ if (nodeLabelExpression == null) {
+ builder.clearNodeLabelExpression();
+ return;
+ }
+ builder.setNodeLabelExpression(nodeLabelExpression);
+ }
private void mergeLocalToBuilder() {
if (this.containerId != null
@@ -274,5 +294,4 @@ public class NMContainerStatusPBImpl extends NMContainerStatus {
private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl)t).getProto();
}
-
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index caada23..7615c66 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -123,6 +123,7 @@ message NMContainerStatusProto {
optional string diagnostics = 5 [default = "N/A"];
optional int32 container_exit_status = 6;
optional int64 creation_time = 7;
+ optional string nodeLabelExpression = 8;
}
message SCMUploaderNotifyRequestProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 131d439..c9874a6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -432,9 +432,10 @@ public class ContainerImpl implements Container {
this.readLock.lock();
try {
return NMContainerStatus.newInstance(this.containerId, getCurrentState(),
- getResource(), diagnostics.toString(), exitCode,
- containerTokenIdentifier.getPriority(),
- containerTokenIdentifier.getCreationTime());
+ getResource(), diagnostics.toString(), exitCode,
+ containerTokenIdentifier.getPriority(),
+ containerTokenIdentifier.getCreationTime(),
+ containerTokenIdentifier.getNodeLabelExpression());
} finally {
this.readLock.unlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 86cc4dc..34495a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -809,7 +809,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(cId, nodeId.toString(), user, r,
System.currentTimeMillis() + 100000L, 123, rmIdentifier,
- Priority.newInstance(0), 0, logAggregationContext);
+ Priority.newInstance(0), 0, logAggregationContext, null);
Token containerToken =
BuilderUtils
.newContainerToken(nodeId, containerTokenSecretManager
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index 20087f5..21d79ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -80,4 +80,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
List<ResourceRequest> getResourceRequests();
String getNodeHttpAddress();
+
+ String getNodeLabelExpression();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 38a03ae..2750d4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
@@ -153,6 +154,7 @@ public class RMContainerImpl implements RMContainer {
private final EventHandler eventHandler;
private final ContainerAllocationExpirer containerAllocationExpirer;
private final String user;
+ private final String nodeLabelExpression;
private Resource reservedResource;
private NodeId reservedNode;
@@ -162,17 +164,24 @@ public class RMContainerImpl implements RMContainer {
private ContainerStatus finishedStatus;
private boolean isAMContainer;
private List<ResourceRequest> resourceRequests;
-
+
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
RMContext rmContext) {
this(container, appAttemptId, nodeId, user, rmContext, System
- .currentTimeMillis());
+ .currentTimeMillis(), "");
+ }
+
+ public RMContainerImpl(Container container,
+ ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+ RMContext rmContext, String nodeLabelExpression) {
+ this(container, appAttemptId, nodeId, user, rmContext, System
+ .currentTimeMillis(), nodeLabelExpression);
}
public RMContainerImpl(Container container,
- ApplicationAttemptId appAttemptId, NodeId nodeId,
- String user, RMContext rmContext, long creationTime) {
+ ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
+ RMContext rmContext, long creationTime, String nodeLabelExpression) {
this.stateMachine = stateMachineFactory.make(this);
this.containerId = container.getId();
this.nodeId = nodeId;
@@ -185,6 +194,7 @@ public class RMContainerImpl implements RMContainer {
this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
this.isAMContainer = false;
this.resourceRequests = null;
+ this.nodeLabelExpression = nodeLabelExpression;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
@@ -597,4 +607,12 @@ public class RMContainerImpl implements RMContainer {
readLock.unlock();
}
}
+
+ @Override
+ public String getNodeLabelExpression() {
+ if (nodeLabelExpression == null) {
+ return RMNodeLabelsManager.NO_LABEL;
+ }
+ return nodeLabelExpression;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index e1f94cf..6699b05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -408,7 +408,7 @@ public abstract class AbstractYarnScheduler
RMContainer rmContainer =
new RMContainerImpl(container, attemptId, node.getNodeID(),
applications.get(attemptId.getApplicationId()).getUser(), rmContext,
- status.getCreationTime());
+ status.getCreationTime(), status.getNodeLabelExpression());
return rmContainer;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index fccf766..4823390 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Scheduli
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
@@ -466,9 +467,10 @@ public class SchedulerApplicationAttempt {
try {
// create container token and NMToken altogether.
container.setContainerToken(rmContext.getContainerTokenSecretManager()
- .createContainerToken(container.getId(), container.getNodeId(),
- getUser(), container.getResource(), container.getPriority(),
- rmContainer.getCreationTime(), this.logAggregationContext));
+ .createContainerToken(container.getId(), container.getNodeId(),
+ getUser(), container.getResource(), container.getPriority(),
+ rmContainer.getCreationTime(), this.logAggregationContext,
+ rmContainer.getNodeLabelExpression()));
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);
@@ -703,4 +705,9 @@ public class SchedulerApplicationAttempt {
this.attemptResourceUsage, nodePartition, cluster,
schedulingMode);
}
+
+ @VisibleForTesting
+ public ResourceUsage getAppAttemptResourceUsage() {
+ return this.attemptResourceUsage;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index e041389..3085d93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -146,9 +146,10 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
// Create RMContainer
- RMContainer rmContainer = new RMContainerImpl(container, this
- .getApplicationAttemptId(), node.getNodeID(),
- appSchedulingInfo.getUser(), this.rmContext);
+ RMContainer rmContainer =
+ new RMContainerImpl(container, this.getApplicationAttemptId(),
+ node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
+ request.getNodeLabelExpression());
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
index 1595d17..1c0533d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
@@ -179,7 +179,7 @@ public class RMContainerTokenSecretManager extends
String appSubmitter, Resource capability, Priority priority,
long createTime) {
return createContainerToken(containerId, nodeId, appSubmitter, capability,
- priority, createTime, null);
+ priority, createTime, null, null);
}
/**
@@ -196,7 +196,8 @@ public class RMContainerTokenSecretManager extends
*/
public Token createContainerToken(ContainerId containerId, NodeId nodeId,
String appSubmitter, Resource capability, Priority priority,
- long createTime, LogAggregationContext logAggregationContext) {
+ long createTime, LogAggregationContext logAggregationContext,
+ String nodeLabelExpression) {
byte[] password;
ContainerTokenIdentifier tokenIdentifier;
long expiryTimeStamp =
@@ -210,7 +211,7 @@ public class RMContainerTokenSecretManager extends
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
.getMasterKey().getKeyId(),
ResourceManager.getClusterTimeStamp(), priority, createTime,
- logAggregationContext);
+ logAggregationContext, nodeLabelExpression);
password = this.createPassword(tokenIdentifier);
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
index a0b67f6..06a8830 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
@@ -1984,14 +1984,21 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
}
}
}
-
+
public static NMContainerStatus createNMContainerStatus(
ApplicationAttemptId appAttemptId, int id, ContainerState containerState) {
+ return createNMContainerStatus(appAttemptId, id, containerState,
+ RMNodeLabelsManager.NO_LABEL);
+ }
+
+ public static NMContainerStatus createNMContainerStatus(
+ ApplicationAttemptId appAttemptId, int id, ContainerState containerState,
+ String nodeLabelExpression) {
ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
NMContainerStatus containerReport =
NMContainerStatus.newInstance(containerId, containerState,
- Resource.newInstance(1024, 1), "recover container", 0,
- Priority.newInstance(0), 0);
+ Resource.newInstance(1024, 1), "recover container", 0,
+ Priority.newInstance(0), 0, nodeLabelExpression);
return containerReport;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 54ba617..47398e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -293,10 +293,11 @@ public class TestContainerAllocation {
public Token createContainerToken(ContainerId containerId,
NodeId nodeId, String appSubmitter, Resource capability,
Priority priority, long createTime,
- LogAggregationContext logAggregationContext) {
+ LogAggregationContext logAggregationContext, String nodeLabelExp) {
numRetries++;
return super.createContainerToken(containerId, nodeId, appSubmitter,
- capability, priority, createTime, logAggregationContext);
+ capability, priority, createTime, logAggregationContext,
+ nodeLabelExp);
}
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25366bc7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java
new file mode 100644
index 0000000..fc9e14a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestWorkPreservingRMRestartForNodeLabel.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestWorkPreservingRMRestartForNodeLabel {
+ private Configuration conf;
+ private static final int GB = 1024; // 1024 MB
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <E> Set<E> toSet(E... elements) {
+ Set<E> set = Sets.newHashSet(elements);
+ return set;
+ }
+
+ private void checkRMContainerLabelExpression(ContainerId containerId,
+ MockRM rm, String labelExpression) {
+ RMContainer container =
+ rm.getRMContext().getScheduler().getRMContainer(containerId);
+ Assert.assertNotNull("Cannot find RMContainer=" + containerId, container);
+ Assert.assertEquals(labelExpression,
+ container.getNodeLabelExpression());
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static void waitForNumContainersToRecover(int num, MockRM rm,
+ ApplicationAttemptId attemptId) throws Exception {
+ AbstractYarnScheduler scheduler =
+ (AbstractYarnScheduler) rm.getResourceScheduler();
+ SchedulerApplicationAttempt attempt =
+ scheduler.getApplicationAttempt(attemptId);
+ while (attempt == null) {
+ System.out.println("Wait for scheduler attempt " + attemptId
+ + " to be created");
+ Thread.sleep(200);
+ attempt = scheduler.getApplicationAttempt(attemptId);
+ }
+ while (attempt.getLiveContainers().size() < num) {
+ System.out.println("Wait for " + num
+ + " containers to recover. currently: "
+ + attempt.getLiveContainers().size());
+ Thread.sleep(200);
+ }
+ }
+
+ private void checkAppResourceUsage(String partition, ApplicationId appId,
+ MockRM rm, int expectedMemUsage) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ FiCaSchedulerApp app =
+ cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
+ Assert.assertEquals(expectedMemUsage, app.getAppAttemptResourceUsage()
+ .getUsed(partition).getMemory());
+ }
+
+ private void checkQueueResourceUsage(String partition, String queueName, MockRM rm, int expectedMemUsage) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ CSQueue queue = cs.getQueue(queueName);
+ Assert.assertEquals(expectedMemUsage, queue.getQueueResourceUsage()
+ .getUsed(partition).getMemory());
+ }
+
+ @Test
+ public void testWorkPreservingRestartForNodeLabel() throws Exception {
+ // This test is pretty much similar to testContainerAllocateWithLabel.
+ // Difference is, this test doesn't specify label expression in ResourceRequest,
+ // instead, it uses default queue label expression
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ conf = TestUtils.getConfigurationWithDefaultQueueLabels(conf);
+
+ // inject node label manager
+ MockRM rm1 =
+ new MockRM(conf,
+ memStore) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // request a container.
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am1.getApplicationAttemptId(), 1), rm1, "x");
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am1.getApplicationAttemptId(), 2), rm1, "x");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // request a container.
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am2.getApplicationAttemptId(), 1), rm1, "y");
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am2.getApplicationAttemptId(), 2), rm1, "y");
+
+ // launch an app to queue c1 (label = ""), and check all container will
+ // be allocated in h3
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+ // request a container.
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am3.getApplicationAttemptId(), 1), rm1, "");
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am3.getApplicationAttemptId(), 2), rm1, "");
+
+ // Re-start RM
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+ MockRM rm2 =
+ new MockRM(conf,
+ memStore) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+ rm2.start();
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm2.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm3.setResourceTrackerService(rm2.getResourceTrackerService());
+
+ // recover app
+ NMContainerStatus app1c1 =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 1,
+ ContainerState.RUNNING, "x");
+ NMContainerStatus app1c2 =
+ TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
+ ContainerState.RUNNING, "x");
+ nm1.registerNode(Arrays.asList(app1c1, app1c2), null);
+ waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am1.getApplicationAttemptId(), 1), rm1, "x");
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am1.getApplicationAttemptId(), 2), rm1, "x");
+
+ NMContainerStatus app2c1 =
+ TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 1,
+ ContainerState.RUNNING, "y");
+ NMContainerStatus app2c2 =
+ TestRMRestart.createNMContainerStatus(am2.getApplicationAttemptId(), 2,
+ ContainerState.RUNNING, "y");
+ nm2.registerNode(Arrays.asList(app2c1, app2c2), null);
+ waitForNumContainersToRecover(2, rm2, am2.getApplicationAttemptId());
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am2.getApplicationAttemptId(), 1), rm1, "y");
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am2.getApplicationAttemptId(), 2), rm1, "y");
+
+ NMContainerStatus app3c1 =
+ TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 1,
+ ContainerState.RUNNING, "");
+ NMContainerStatus app3c2 =
+ TestRMRestart.createNMContainerStatus(am3.getApplicationAttemptId(), 2,
+ ContainerState.RUNNING, "");
+ nm3.registerNode(Arrays.asList(app3c1, app3c2), null);
+ waitForNumContainersToRecover(2, rm2, am3.getApplicationAttemptId());
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am3.getApplicationAttemptId(), 1), rm1, "");
+ checkRMContainerLabelExpression(ContainerId.newContainerId(
+ am3.getApplicationAttemptId(), 2), rm1, "");
+
+ // Check recovered resource usage
+ checkAppResourceUsage("x", app1.getApplicationId(), rm1, 2 * GB);
+ checkAppResourceUsage("y", app2.getApplicationId(), rm1, 2 * GB);
+ checkAppResourceUsage("", app3.getApplicationId(), rm1, 2 * GB);
+ checkQueueResourceUsage("x", "a1", rm1, 2 * GB);
+ checkQueueResourceUsage("y", "b1", rm1, 2 * GB);
+ checkQueueResourceUsage("", "c1", rm1, 2 * GB);
+ checkQueueResourceUsage("x", "a", rm1, 2 * GB);
+ checkQueueResourceUsage("y", "b", rm1, 2 * GB);
+ checkQueueResourceUsage("", "c", rm1, 2 * GB);
+ checkQueueResourceUsage("x", "root", rm1, 2 * GB);
+ checkQueueResourceUsage("y", "root", rm1, 2 * GB);
+ checkQueueResourceUsage("", "root", rm1, 2 * GB);
+
+
+ rm1.close();
+ rm2.close();
+ }
+}
\ No newline at end of file