You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/22 20:26:59 UTC
[25/32] hadoop git commit: YARN-1644. RM-NM protocol changes and
NodeStatusUpdater implementation to support container resizing. Contributed
by Meng Ding
YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to support container resizing. Contributed by Meng Ding
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1e4034a8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1e4034a8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1e4034a8
Branch: refs/heads/YARN-1197
Commit: 1e4034a84f668356f47f8af8abbdc6814115c17f
Parents: de9bb82
Author: Jian He <ji...@apache.org>
Authored: Thu Aug 20 21:04:14 2015 -0700
Committer: Wangda Tan <wa...@apache.org>
Committed: Tue Sep 22 11:25:28 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../yarn/client/TestResourceTrackerOnHA.java | 2 +-
.../protocolrecords/NodeHeartbeatResponse.java | 4 +
.../impl/pb/NodeHeartbeatResponsePBImpl.java | 76 +++++-
.../yarn/server/api/records/NodeStatus.java | 15 +-
.../api/records/impl/pb/NodeStatusPBImpl.java | 75 +++++-
.../main/proto/yarn_server_common_protos.proto | 3 +-
.../yarn_server_common_service_protos.proto | 1 +
.../hadoop/yarn/TestYarnServerApiClasses.java | 39 ++-
.../hadoop/yarn/server/nodemanager/Context.java | 3 +
.../yarn/server/nodemanager/NodeManager.java | 10 +
.../nodemanager/NodeStatusUpdaterImpl.java | 57 +++-
.../containermanager/ContainerManagerImpl.java | 159 +++++++-----
.../nodemanager/TestNodeManagerResync.java | 258 +++++++++++++++++++
.../amrmproxy/BaseAMRMProxyTest.java | 5 +
.../amrmproxy/MockResourceManagerFacade.java | 6 +-
.../containermanager/TestContainerManager.java | 2 +-
17 files changed, 628 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index acff3d6..274ffc4 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -215,6 +215,9 @@ Release 2.8.0 - UNRELEASED
YARN-1643. Make ContainersMonitor support changing monitoring size of an
allocated container. (Meng Ding and Wangda Tan)
+ YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to
+ support container resizing. (Meng Ding via jianhe)
+
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
index 6cdf87f..338198b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
@@ -68,7 +68,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
failoverThread = createAndStartFailoverThread();
NodeStatus status =
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
- null, null, null, null);
+ null, null, null, null, null);
NodeHeartbeatRequest request2 =
NodeHeartbeatRequest.newInstance(status, null, null,null);
resourceTracker.nodeHeartbeat(request2);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 1498a0c..38fbc82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -70,4 +71,7 @@ public interface NodeHeartbeatResponse {
boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
+
+ List<Container> getContainersToDecrease();
+ void addAllContainersToDecrease(List<Container> containersToDecrease);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index e27d8ca..12c5230 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -27,12 +27,15 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
@@ -58,7 +61,9 @@ public class NodeHeartbeatResponsePBImpl extends
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
-
+
+ private List<Container> containersToDecrease = null;
+
public NodeHeartbeatResponsePBImpl() {
builder = NodeHeartbeatResponseProto.newBuilder();
}
@@ -96,6 +101,9 @@ public class NodeHeartbeatResponsePBImpl extends
if (this.systemCredentials != null) {
addSystemCredentialsToProto();
}
+ if (this.containersToDecrease != null) {
+ addContainersToDecreaseToProto();
+ }
}
private void addSystemCredentialsToProto() {
@@ -408,6 +416,64 @@ public class NodeHeartbeatResponsePBImpl extends
builder.addAllApplicationsToCleanup(iterable);
}
+ private void initContainersToDecrease() {
+ if (this.containersToDecrease != null) {
+ return;
+ }
+ NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerProto> list = p.getContainersToDecreaseList();
+ this.containersToDecrease = new ArrayList<>();
+
+ for (ContainerProto c : list) {
+ this.containersToDecrease.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public List<Container> getContainersToDecrease() {
+ initContainersToDecrease();
+ return this.containersToDecrease;
+ }
+
+ @Override
+ public void addAllContainersToDecrease(
+ final List<Container> containersToDecrease) {
+ if (containersToDecrease == null) {
+ return;
+ }
+ initContainersToDecrease();
+ this.containersToDecrease.addAll(containersToDecrease);
+ }
+
+ private void addContainersToDecreaseToProto() {
+ maybeInitBuilder();
+ builder.clearContainersToDecrease();
+ if (this.containersToDecrease == null) {
+ return;
+ }
+ Iterable<ContainerProto> iterable = new
+ Iterable<ContainerProto>() {
+ @Override
+ public Iterator<ContainerProto> iterator() {
+ return new Iterator<ContainerProto>() {
+ private Iterator<Container> iter = containersToDecrease.iterator();
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+ @Override
+ public ContainerProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllContainersToDecrease(iterable);
+ }
@Override
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
@@ -484,6 +550,14 @@ public class NodeHeartbeatResponsePBImpl extends
return ((MasterKeyPBImpl) t).getProto();
}
+ private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
+ return new ContainerPBImpl(p);
+ }
+
+ private ContainerProto convertToProtoFormat(Container t) {
+ return ((ContainerPBImpl) t).getProto();
+ }
+
@Override
public boolean getAreNodeLabelsAcceptedByRM() {
NodeHeartbeatResponseProtoOrBuilder p =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 7b8262f..2d62db5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
@@ -48,6 +49,7 @@ public abstract class NodeStatus {
* @param nodeHealthStatus Health status of the node.
* @param containersUtilization Utilization of the containers in this node.
* @param nodeUtilization Utilization of the node.
+ * @param increasedContainers Containers whose resource has been increased.
* @return New {@code NodeStatus} with the provided information.
*/
public static NodeStatus newInstance(NodeId nodeId, int responseId,
@@ -55,7 +57,8 @@ public abstract class NodeStatus {
List<ApplicationId> keepAliveApplications,
NodeHealthStatus nodeHealthStatus,
ResourceUtilization containersUtilization,
- ResourceUtilization nodeUtilization) {
+ ResourceUtilization nodeUtilization,
+ List<Container> increasedContainers) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
@@ -64,6 +67,7 @@ public abstract class NodeStatus {
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
nodeStatus.setContainersUtilization(containersUtilization);
nodeStatus.setNodeUtilization(nodeUtilization);
+ nodeStatus.setIncreasedContainers(increasedContainers);
return nodeStatus;
}
@@ -108,4 +112,13 @@ public abstract class NodeStatus {
@Unstable
public abstract void setNodeUtilization(
ResourceUtilization nodeUtilization);
+
+ @Public
+ @Unstable
+ public abstract List<Container> getIncreasedContainers();
+
+ @Private
+ @Unstable
+ public abstract void setIncreasedContainers(
+ List<Container> increasedContainers);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 7d4e83f..e34451d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -24,13 +24,16 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
@@ -49,7 +52,8 @@ public class NodeStatusPBImpl extends NodeStatus {
private List<ContainerStatus> containers = null;
private NodeHealthStatus nodeHealthStatus = null;
private List<ApplicationId> keepAliveApplications = null;
-
+ private List<Container> increasedContainers = null;
+
public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder();
}
@@ -79,6 +83,9 @@ public class NodeStatusPBImpl extends NodeStatus {
if (this.keepAliveApplications != null) {
addKeepAliveApplicationsToProto();
}
+ if (this.increasedContainers != null) {
+ addIncreasedContainersToProto();
+ }
}
private synchronized void mergeLocalToProto() {
@@ -165,6 +172,37 @@ public class NodeStatusPBImpl extends NodeStatus {
builder.addAllKeepAliveApplications(iterable);
}
+ private synchronized void addIncreasedContainersToProto() {
+ maybeInitBuilder();
+ builder.clearIncreasedContainers();
+ if (increasedContainers == null) {
+ return;
+ }
+ Iterable<ContainerProto> iterable = new
+ Iterable<ContainerProto>() {
+ @Override
+ public Iterator<ContainerProto> iterator() {
+ return new Iterator<ContainerProto>() {
+ private Iterator<Container> iter =
+ increasedContainers.iterator();
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+ @Override
+ public ContainerProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllIncreasedContainers(iterable);
+ }
+
@Override
public int hashCode() {
return getProto().hashCode();
@@ -336,6 +374,31 @@ public class NodeStatusPBImpl extends NodeStatus {
.setNodeUtilization(convertToProtoFormat(nodeUtilization));
}
+ @Override
+ public synchronized List<Container> getIncreasedContainers() {
+ if (increasedContainers != null) {
+ return increasedContainers;
+ }
+ NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerProto> list = p.getIncreasedContainersList();
+ this.increasedContainers = new ArrayList<>();
+ for (ContainerProto c : list) {
+ this.increasedContainers.add(convertFromProtoFormat(c));
+ }
+ return this.increasedContainers;
+ }
+
+ @Override
+ public synchronized void setIncreasedContainers(
+ List<Container> increasedContainers) {
+ maybeInitBuilder();
+ if (increasedContainers == null) {
+ builder.clearIncreasedContainers();
+ return;
+ }
+ this.increasedContainers = increasedContainers;
+ }
+
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
@@ -377,4 +440,14 @@ public class NodeStatusPBImpl extends NodeStatus {
ResourceUtilizationProto p) {
return new ResourceUtilizationPBImpl(p);
}
+
+ private ContainerPBImpl convertFromProtoFormat(
+ ContainerProto c) {
+ return new ContainerPBImpl(c);
+ }
+
+ private ContainerProto convertToProtoFormat(
+ Container c) {
+ return ((ContainerPBImpl)c).getProto();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 901051f..b161f5b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -38,6 +38,7 @@ message NodeStatusProto {
repeated ApplicationIdProto keep_alive_applications = 5;
optional ResourceUtilizationProto containers_utilization = 6;
optional ResourceUtilizationProto node_utilization = 7;
+ repeated ContainerProto increased_containers = 8;
}
message MasterKeyProto {
@@ -60,4 +61,4 @@ message ResourceUtilizationProto {
optional int32 pmem = 1;
optional int32 vmem = 2;
optional float cpu = 3;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index c122b2a..2db8919 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -82,6 +82,7 @@ message NodeHeartbeatResponseProto {
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
+ repeated ContainerProto containers_to_decrease = 12;
}
message SystemCredentialsForAppsProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index d9eeb9d..c9427dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -168,6 +169,20 @@ public class TestYarnServerApiClasses {
assertTrue(copy.getAreNodeLabelsAcceptedByRM());
}
+ @Test
+ public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() {
+ NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
+ original.addAllContainersToDecrease(
+ Arrays.asList(getDecreasedContainer(1, 2, 2048, 2),
+ getDecreasedContainer(2, 3, 1024, 1)));
+ NodeHeartbeatResponsePBImpl copy =
+ new NodeHeartbeatResponsePBImpl(original.getProto());
+ assertEquals(1, copy.getContainersToDecrease().get(0)
+ .getId().getContainerId());
+ assertEquals(1024, copy.getContainersToDecrease().get(1)
+ .getResource().getMemory());
+ }
+
/**
* Test RegisterNodeManagerRequestPBImpl.
*/
@@ -244,6 +259,9 @@ public class TestYarnServerApiClasses {
original.setNodeHealthStatus(getNodeHealthStatus());
original.setNodeId(getNodeId());
original.setResponseId(1);
+ original.setIncreasedContainers(
+ Arrays.asList(getIncreasedContainer(1, 2, 2048, 2),
+ getIncreasedContainer(2, 3, 4096, 3)));
NodeStatusPBImpl copy = new NodeStatusPBImpl(original.getProto());
assertEquals(3L, copy.getContainersStatuses().get(1).getContainerId()
@@ -252,7 +270,10 @@ public class TestYarnServerApiClasses {
assertEquals(1000, copy.getNodeHealthStatus().getLastHealthReportTime());
assertEquals(9090, copy.getNodeId().getPort());
assertEquals(1, copy.getResponseId());
-
+ assertEquals(1, copy.getIncreasedContainers().get(0)
+ .getId().getContainerId());
+ assertEquals(4096, copy.getIncreasedContainers().get(1)
+ .getResource().getMemory());
}
@Test
@@ -347,6 +368,22 @@ public class TestYarnServerApiClasses {
return new ApplicationIdPBImpl(appId.getProto());
}
+ private Container getDecreasedContainer(int containerID,
+ int appAttemptId, int memory, int vCores) {
+ ContainerId containerId = getContainerId(containerID, appAttemptId);
+ Resource capability = Resource.newInstance(memory, vCores);
+ return Container.newInstance(
+ containerId, null, null, capability, null, null);
+ }
+
+ private Container getIncreasedContainer(int containerID,
+ int appAttemptId, int memory, int vCores) {
+ ContainerId containerId = getContainerId(containerID, appAttemptId);
+ Resource capability = Resource.newInstance(memory, vCores);
+ return Container.newInstance(
+ containerId, null, null, capability, null, null);
+ }
+
private NodeStatus getNodeStatus() {
NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
status.setContainersStatuses(new ArrayList<ContainerStatus>());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 52d937b..9c2d1fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -62,6 +62,9 @@ public interface Context {
ConcurrentMap<ContainerId, Container> getContainers();
+ ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
+ getIncreasedContainers();
+
NMContainerTokenSecretManager getContainerTokenSecretManager();
NMTokenSecretManagerInNM getNMTokenSecretManager();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 3cf9f1a..184f489 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -439,6 +439,10 @@ public class NodeManager extends CompositeService
protected final ConcurrentMap<ContainerId, Container> containers =
new ConcurrentSkipListMap<ContainerId, Container>();
+ protected final ConcurrentMap<ContainerId,
+ org.apache.hadoop.yarn.api.records.Container> increasedContainers =
+ new ConcurrentHashMap<>();
+
private final NMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInNM nmTokenSecretManager;
private ContainerManagementProtocol containerManager;
@@ -493,6 +497,12 @@ public class NodeManager extends CompositeService
}
@Override
+ public ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
+ getIncreasedContainers() {
+ return this.increasedContainers;
+ }
+
+ @Override
public NMContainerTokenSecretManager getContainerTokenSecretManager() {
return this.containerTokenSecretManager;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index aa51e5c..f8ce90f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -310,18 +310,28 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@VisibleForTesting
protected void registerWithRM()
throws YarnException, IOException {
- List<NMContainerStatus> containerReports = getNMContainerStatuses();
+ RegisterNodeManagerResponse regNMResponse;
Set<NodeLabel> nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration();
- RegisterNodeManagerRequest request =
- RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
- nodeManagerVersionId, containerReports, getRunningApplications(),
- nodeLabels);
- if (containerReports != null) {
- LOG.info("Registering with RM using containers :" + containerReports);
+
+ // Synchronize NM-RM registration with
+ // ContainerManagerImpl#increaseContainersResource and
+ // ContainerManagerImpl#startContainers to avoid race condition
+ // during RM recovery
+ synchronized (this.context) {
+ List<NMContainerStatus> containerReports = getNMContainerStatuses();
+ RegisterNodeManagerRequest request =
+ RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
+ nodeManagerVersionId, containerReports, getRunningApplications(),
+ nodeLabels);
+ if (containerReports != null) {
+ LOG.info("Registering with RM using containers :" + containerReports);
+ }
+ regNMResponse =
+ resourceTracker.registerNodeManager(request);
+ // Make sure rmIdentifier is set before we release the lock
+ this.rmIdentifier = regNMResponse.getRMIdentifier();
}
- RegisterNodeManagerResponse regNMResponse =
- resourceTracker.registerNodeManager(request);
- this.rmIdentifier = regNMResponse.getRMIdentifier();
+
// if the Resource Manager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message =
@@ -418,10 +428,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
List<ContainerStatus> containersStatuses = getContainerStatuses();
ResourceUtilization containersUtilization = getContainersUtilization();
ResourceUtilization nodeUtilization = getNodeUtilization();
+ List<org.apache.hadoop.yarn.api.records.Container> increasedContainers
+ = getIncreasedContainers();
NodeStatus nodeStatus =
NodeStatus.newInstance(nodeId, responseId, containersStatuses,
createKeepAliveApplicationList(), nodeHealthStatus,
- containersUtilization, nodeUtilization);
+ containersUtilization, nodeUtilization, increasedContainers);
return nodeStatus;
}
@@ -448,6 +460,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return nodeResourceMonitor.getUtilization();
}
+ /* Get the containers whose resource has been increased since last
+ * NM-RM heartbeat.
+ */
+ private List<org.apache.hadoop.yarn.api.records.Container>
+ getIncreasedContainers() {
+ List<org.apache.hadoop.yarn.api.records.Container>
+ increasedContainers = new ArrayList<>(
+ this.context.getIncreasedContainers().values());
+ for (org.apache.hadoop.yarn.api.records.Container
+ container : increasedContainers) {
+ this.context.getIncreasedContainers().remove(container.getId());
+ }
+ return increasedContainers;
+ }
+
// Iterate through the NMContext and clone and get all the containers'
// statuses. If it's a completed container, add into the
// recentlyStoppedContainers collections.
@@ -765,6 +792,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
((NMContext) context)
.setSystemCrendentialsForApps(parseCredentials(systemCredentials));
}
+
+ List<org.apache.hadoop.yarn.api.records.Container>
+ containersToDecrease = response.getContainersToDecrease();
+ if (!containersToDecrease.isEmpty()) {
+ dispatcher.getEventHandler().handle(
+ new CMgrDecreaseContainersResourceEvent(containersToDecrease)
+ );
+ }
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 4f2ccbe..868d8d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -563,8 +563,7 @@ public class ContainerManagerImpl extends CompositeService implements
List<ApplicationId> appIds =
new ArrayList<ApplicationId>(applications.keySet());
- this.handle(
- new CMgrCompletedAppsEvent(appIds,
+ this.handle(new CMgrCompletedAppsEvent(appIds,
CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for Applications to be Finished");
@@ -584,8 +583,8 @@ public class ContainerManagerImpl extends CompositeService implements
if (applications.isEmpty()) {
LOG.info("All applications in FINISHED state");
} else {
- LOG.info("Done waiting for Applications to be Finished. Still alive: " +
- applications.keySet());
+ LOG.info("Done waiting for Applications to be Finished. Still alive: "
+ + applications.keySet());
}
}
@@ -759,13 +758,12 @@ public class ContainerManagerImpl extends CompositeService implements
* Start a list of containers on this NodeManager.
*/
@Override
- public StartContainersResponse
- startContainers(StartContainersRequest requests) throws YarnException,
- IOException {
+ public StartContainersResponse startContainers(
+ StartContainersRequest requests) throws YarnException, IOException {
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
- "Rejecting new containers as NodeManager has not"
- + " yet connected with ResourceManager");
+ "Rejecting new containers as NodeManager has not"
+ + " yet connected with ResourceManager");
}
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
@@ -773,42 +771,50 @@ public class ContainerManagerImpl extends CompositeService implements
List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
- for (StartContainerRequest request : requests.getStartContainerRequests()) {
- ContainerId containerId = null;
- try {
- if (request.getContainerToken() == null ||
- request.getContainerToken().getIdentifier() == null) {
- throw new IOException(INVALID_CONTAINERTOKEN_MSG);
- }
- ContainerTokenIdentifier containerTokenIdentifier =
- BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
- verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
- containerTokenIdentifier);
- containerId = containerTokenIdentifier.getContainerID();
-
- // Initialize the AMRMProxy service instance only if the container is of
- // type AM and if the AMRMProxy service is enabled
- if (isARMRMProxyEnabled()
- && containerTokenIdentifier.getContainerType().equals(
- ContainerType.APPLICATION_MASTER)) {
- this.amrmProxyService.processApplicationStartRequest(request);
- }
+ // Synchronize with NodeStatusUpdaterImpl#registerWithRM
+ // to avoid race condition during NM-RM resync (due to RM restart) while a
+ // container is being started, in particular when the container has not yet
+ // been added to the containers map in NMContext.
+ synchronized (this.context) {
+ for (StartContainerRequest request : requests
+ .getStartContainerRequests()) {
+ ContainerId containerId = null;
+ try {
+ if (request.getContainerToken() == null
+ || request.getContainerToken().getIdentifier() == null) {
+ throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+ }
- startContainerInternal(nmTokenIdentifier,
- containerTokenIdentifier, request);
- succeededContainers.add(containerId);
- } catch (YarnException e) {
- failedContainers.put(containerId, SerializedException.newInstance(e));
- } catch (InvalidToken ie) {
- failedContainers.put(containerId, SerializedException.newInstance(ie));
- throw ie;
- } catch (IOException e) {
- throw RPCUtil.getRemoteException(e);
+ ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils
+ .newContainerTokenIdentifier(request.getContainerToken());
+ verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
+ containerTokenIdentifier);
+ containerId = containerTokenIdentifier.getContainerID();
+
+ // Initialize the AMRMProxy service instance only if the container is of
+ // type AM and if the AMRMProxy service is enabled
+ if (isARMRMProxyEnabled() && containerTokenIdentifier
+ .getContainerType().equals(ContainerType.APPLICATION_MASTER)) {
+ this.amrmProxyService.processApplicationStartRequest(request);
+ }
+
+ startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
+ request);
+ succeededContainers.add(containerId);
+ } catch (YarnException e) {
+ failedContainers.put(containerId, SerializedException.newInstance(e));
+ } catch (InvalidToken ie) {
+ failedContainers
+ .put(containerId, SerializedException.newInstance(ie));
+ throw ie;
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
+ }
}
+ return StartContainersResponse
+ .newInstance(getAuxServiceMetaData(), succeededContainers,
+ failedContainers);
}
-
- return StartContainersResponse.newInstance(getAuxServiceMetaData(),
- succeededContainers, failedContainers);
}
private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
@@ -959,7 +965,7 @@ public class ContainerManagerImpl extends CompositeService implements
InvalidToken {
byte[] password =
context.getContainerTokenSecretManager().retrievePassword(
- containerTokenIdentifier);
+ containerTokenIdentifier);
byte[] tokenPass = token.getPassword().array();
if (password == null || tokenPass == null
|| !Arrays.equals(password, tokenPass)) {
@@ -989,32 +995,39 @@ public class ContainerManagerImpl extends CompositeService implements
= new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
- // Process container resource increase requests
- for (org.apache.hadoop.yarn.api.records.Token token :
- requests.getContainersToIncrease()) {
- ContainerId containerId = null;
- try {
- if (token.getIdentifier() == null) {
- throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+ // Synchronize with NodeStatusUpdaterImpl#registerWithRM
+ // to avoid race condition during NM-RM resync (due to RM restart) while a
+ // container resource is being increased in NM, in particular when the
+ // increased container has not yet been added to the increasedContainers
+ // map in NMContext.
+ synchronized (this.context) {
+ // Process container resource increase requests
+ for (org.apache.hadoop.yarn.api.records.Token token :
+ requests.getContainersToIncrease()) {
+ ContainerId containerId = null;
+ try {
+ if (token.getIdentifier() == null) {
+ throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+ }
+ ContainerTokenIdentifier containerTokenIdentifier =
+ BuilderUtils.newContainerTokenIdentifier(token);
+ verifyAndGetContainerTokenIdentifier(token,
+ containerTokenIdentifier);
+ authorizeStartAndResourceIncreaseRequest(
+ nmTokenIdentifier, containerTokenIdentifier, false);
+ containerId = containerTokenIdentifier.getContainerID();
+ // Reuse the startContainer logic to update NMToken,
+ // as container resource increase request will have come with
+ // an updated NMToken.
+ updateNMTokenIdentifier(nmTokenIdentifier);
+ Resource resource = containerTokenIdentifier.getResource();
+ changeContainerResourceInternal(containerId, resource, true);
+ successfullyIncreasedContainers.add(containerId);
+ } catch (YarnException | InvalidToken e) {
+ failedContainers.put(containerId, SerializedException.newInstance(e));
+ } catch (IOException e) {
+ throw RPCUtil.getRemoteException(e);
}
- ContainerTokenIdentifier containerTokenIdentifier =
- BuilderUtils.newContainerTokenIdentifier(token);
- verifyAndGetContainerTokenIdentifier(token,
- containerTokenIdentifier);
- authorizeStartAndResourceIncreaseRequest(
- nmTokenIdentifier, containerTokenIdentifier, false);
- containerId = containerTokenIdentifier.getContainerID();
- // Reuse the startContainer logic to update NMToken,
- // as container resource increase request will have come with
- // an updated NMToken.
- updateNMTokenIdentifier(nmTokenIdentifier);
- Resource resource = containerTokenIdentifier.getResource();
- changeContainerResourceInternal(containerId, resource, true);
- successfullyIncreasedContainers.add(containerId);
- } catch (YarnException | InvalidToken e) {
- failedContainers.put(containerId, SerializedException.newInstance(e));
- } catch (IOException e) {
- throw RPCUtil.getRemoteException(e);
}
}
return IncreaseContainersResourceResponse.newInstance(
@@ -1075,6 +1088,16 @@ public class ContainerManagerImpl extends CompositeService implements
+ " is not smaller than the current resource "
+ currentResource.toString());
}
+ if (increase) {
+ org.apache.hadoop.yarn.api.records.Container increasedContainer =
+ org.apache.hadoop.yarn.api.records.Container.newInstance(
+ containerId, null, null, targetResource, null, null);
+ if (context.getIncreasedContainers().putIfAbsent(containerId,
+ increasedContainer) != null){
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ + " resource is being increased.");
+ }
+ }
this.readLock.lock();
try {
if (!serviceStopped) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index c22d475..4250ac3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -18,21 +18,35 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -41,8 +55,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -50,6 +69,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -57,12 +78,15 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -87,7 +111,10 @@ public class TestNodeManagerResync {
private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
private final NodeManagerEvent resyncEvent =
new NodeManagerEvent(NodeManagerEventType.RESYNC);
+ private final long DUMMY_RM_IDENTIFIER = 1234;
+ protected static Log LOG = LogFactory
+ .getLog(TestNodeManagerResync.class);
@Before
public void setup() throws UnsupportedFileSystemException {
@@ -209,6 +236,32 @@ public class TestNodeManagerResync {
nm.stop();
}
+ @SuppressWarnings("unchecked")
+ @Test(timeout=60000)
+ public void testContainerResourceIncreaseIsSynchronizedWithRMResync()
+ throws IOException, InterruptedException, YarnException {
+ NodeManager nm = new TestNodeManager4();
+ YarnConfiguration conf = createNMConfig();
+ conf.setBoolean(
+ YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+ nm.init(conf);
+ nm.start();
+ // Start a container and make sure it is in RUNNING state
+ ((TestNodeManager4)nm).startContainer();
+ // Simulate a container resource increase in a separate thread
+ ((TestNodeManager4)nm).increaseContainersResource();
+ // Simulate RM restart by sending a RESYNC event
+ LOG.info("Sending out RESYNC event");
+ nm.getNMDispatcher().getEventHandler().handle(
+ new NodeManagerEvent(NodeManagerEventType.RESYNC));
+ try {
+ syncBarrier.await();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ Assert.assertFalse(assertionFailedInThread.get());
+ nm.stop();
+ }
// This is to test when NM gets the resync response from last heart beat, it
// should be able to send the already-sent-via-last-heart-beat container
@@ -588,6 +641,211 @@ public class TestNodeManagerResync {
}
}}
+ class TestNodeManager4 extends NodeManager {
+
+ private Thread increaseContainerResourceThread = null;
+
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new TestNodeStatusUpdaterImpl4(context, dispatcher,
+ healthChecker, metrics);
+ }
+
+ @Override
+ protected ContainerManagerImpl createContainerManager(Context context,
+ ContainerExecutor exec, DeletionService del,
+ NodeStatusUpdater nodeStatusUpdater,
+ ApplicationACLsManager aclsManager,
+ LocalDirsHandlerService dirsHandler) {
+ return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
+ metrics, dirsHandler){
+ @Override
+ public void
+ setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+ // do nothing
+ }
+
+ @Override
+ protected void authorizeGetAndStopContainerRequest(
+ ContainerId containerId, Container container,
+ boolean stopRequest, NMTokenIdentifier identifier)
+ throws YarnException {
+ // do nothing
+ }
+ @Override
+ protected void authorizeUser(UserGroupInformation remoteUgi,
+ NMTokenIdentifier nmTokenIdentifier) {
+ // do nothing
+ }
+ @Override
+ protected void authorizeStartAndResourceIncreaseRequest(
+ NMTokenIdentifier nmTokenIdentifier,
+ ContainerTokenIdentifier containerTokenIdentifier,
+ boolean startRequest) throws YarnException {
+ try {
+ // Sleep 2 seconds to simulate a pro-longed increase action.
+ // If during this time a RESYNC event is sent by RM, the
+ // resync action should block until the increase action is
+ // completed.
+ // See testContainerResourceIncreaseIsSynchronizedWithRMResync()
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ @Override
+ protected void updateNMTokenIdentifier(
+ NMTokenIdentifier nmTokenIdentifier)
+ throws SecretManager.InvalidToken {
+ // Do nothing
+ }
+ @Override
+ public Map<String, ByteBuffer> getAuxServiceMetaData() {
+ return new HashMap<>();
+ }
+ @Override
+ protected NMTokenIdentifier selectNMTokenIdentifier(
+ UserGroupInformation remoteUgi) {
+ return new NMTokenIdentifier();
+ }
+ };
+ }
+
+ // Start a container in NM
+ public void startContainer()
+ throws IOException, InterruptedException, YarnException {
+ LOG.info("Start a container and wait until it is in RUNNING state");
+ File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+ PrintWriter fileWriter = new PrintWriter(scriptFile);
+ if (Shell.WINDOWS) {
+ fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+ } else {
+ fileWriter.write("\numask 0");
+ fileWriter.write("\nexec sleep 100");
+ }
+ fileWriter.close();
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+ URL resource_alpha =
+ ConverterUtils.getYarnUrlFromPath(localFS
+ .makeQualified(new Path(scriptFile.getAbsolutePath())));
+ LocalResource rsrc_alpha =
+ recordFactory.newRecordInstance(LocalResource.class);
+ rsrc_alpha.setResource(resource_alpha);
+ rsrc_alpha.setSize(-1);
+ rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+ rsrc_alpha.setType(LocalResourceType.FILE);
+ rsrc_alpha.setTimestamp(scriptFile.lastModified());
+ String destinationFile = "dest_file";
+ Map<String, LocalResource> localResources =
+ new HashMap<String, LocalResource>();
+ localResources.put(destinationFile, rsrc_alpha);
+ containerLaunchContext.setLocalResources(localResources);
+ List<String> commands =
+ Arrays.asList(Shell.getRunScriptCommand(scriptFile));
+ containerLaunchContext.setCommands(commands);
+ Resource resource = Resource.newInstance(1024, 1);
+ StartContainerRequest scRequest =
+ StartContainerRequest.newInstance(
+ containerLaunchContext,
+ getContainerToken(resource));
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+ list.add(scRequest);
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ getContainerManager().startContainers(allRequests);
+ // Make sure the container reaches RUNNING state
+ ContainerId cId = TestContainerManager.createContainerId(0);
+ BaseContainerManagerTest.waitForNMContainerState(
+ getContainerManager(), cId,
+ org.apache.hadoop.yarn.server.nodemanager.
+ containermanager.container.ContainerState.RUNNING);
+ }
+
+ // Increase container resource in a thread
+ public void increaseContainersResource()
+ throws InterruptedException {
+ LOG.info("Increase a container resource in a separate thread");
+ increaseContainerResourceThread = new IncreaseContainersResourceThread();
+ increaseContainerResourceThread.start();
+ }
+
+ class TestNodeStatusUpdaterImpl4 extends MockNodeStatusUpdater {
+
+ public TestNodeStatusUpdaterImpl4(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+ super(context, dispatcher, healthChecker, metrics);
+ }
+
+ @Override
+ protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
+ try {
+ try {
+ // Check status before registerWithRM
+ List<ContainerId> containerIds = new ArrayList<>();
+ ContainerId cId = TestContainerManager.createContainerId(0);
+ containerIds.add(cId);
+ GetContainerStatusesRequest gcsRequest =
+ GetContainerStatusesRequest.newInstance(containerIds);
+ ContainerStatus containerStatus = getContainerManager()
+ .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+ assertEquals(Resource.newInstance(1024, 1),
+ containerStatus.getCapability());
+ // Call the actual rebootNodeStatusUpdaterAndRegisterWithRM().
+ // This function should be synchronized with
+ // increaseContainersResource().
+ super.rebootNodeStatusUpdaterAndRegisterWithRM();
+ // Check status after registerWithRM
+ containerStatus = getContainerManager()
+ .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+ assertEquals(Resource.newInstance(4096, 2),
+ containerStatus.getCapability());
+ } catch (AssertionError ae) {
+ ae.printStackTrace();
+ assertionFailedInThread.set(true);
+ } finally {
+ syncBarrier.await();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ class IncreaseContainersResourceThread extends Thread {
+ @Override
+ public void run() {
+ // Construct container resource increase request
+ List<Token> increaseTokens = new ArrayList<Token>();
+ // Add increase request.
+ Resource targetResource = Resource.newInstance(4096, 2);
+ try {
+ increaseTokens.add(getContainerToken(targetResource));
+ IncreaseContainersResourceRequest increaseRequest =
+ IncreaseContainersResourceRequest.newInstance(increaseTokens);
+ IncreaseContainersResourceResponse increaseResponse =
+ getContainerManager()
+ .increaseContainersResource(increaseRequest);
+ Assert.assertEquals(
+ 1, increaseResponse.getSuccessfullyIncreasedContainers()
+ .size());
+ Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private Token getContainerToken(Resource resource) throws IOException {
+ ContainerId cId = TestContainerManager.createContainerId(0);
+ return TestContainerManager.createContainerToken(
+ cId, DUMMY_RM_IDENTIFIER,
+ getNMContext().getNodeId(), user, resource,
+ getNMContext().getContainerTokenSecretManager(), null);
+ }
+ }
+
public static NMContainerStatus createNMContainerStatus(int id,
ContainerState containerState) {
ApplicationId applicationId = ApplicationId.newInstance(0, 1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 964379a..9bc23f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -620,6 +620,11 @@ public abstract class BaseAMRMProxyTest {
}
@Override
+ public ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container> getIncreasedContainers() {
+ return null;
+ }
+
+ @Override
public NMContainerTokenSecretManager getContainerTokenSecretManager() {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
index 7573a7a..f482784 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -93,8 +93,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
-import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -292,8 +290,8 @@ public class MockResourceManagerFacade implements
new ArrayList<ContainerStatus>(), containerList,
new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
new ArrayList<NMToken>(),
- new ArrayList<ContainerResourceIncrease>(),
- new ArrayList<ContainerResourceDecrease>());
+ new ArrayList<Container>(),
+ new ArrayList<Container>());
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1e4034a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 2ea9146..3fb4112 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -108,7 +108,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
super.setup();
}
- private ContainerId createContainerId(int id) {
+ public static ContainerId createContainerId(int id) {
ApplicationId appId = ApplicationId.newInstance(0, 0);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);