You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/02/23 20:37:13 UTC
[25/52] [abbrv] hadoop git commit: YARN-3194. RM should handle
NMContainerStatuses sent by NM while registering if NM is Reconnected node.
Contributed by Rohith
YARN-3194. RM should handle NMContainerStatuses sent by NM while registering if NM is Reconnected node. Contributed by Rohith
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a64dd3d2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a64dd3d2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a64dd3d2
Branch: refs/heads/HDFS-7285
Commit: a64dd3d24bfcb9af21eb63869924f6482b147fd3
Parents: 7ae5255
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Feb 20 15:08:48 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Feb 20 15:10:10 2015 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../resourcemanager/ResourceTrackerService.java | 9 +-
.../resourcemanager/rmnode/RMNodeImpl.java | 111 ++++++++++-------
.../rmnode/RMNodeReconnectEvent.java | 9 +-
.../resourcemanager/TestApplicationCleanup.java | 121 +++++++++++++++++++
.../resourcemanager/TestRMNodeTransitions.java | 4 +-
6 files changed, 209 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cac6680..8ec2409 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -626,6 +626,9 @@ Release 2.7.0 - UNRELEASED
YARN-933. Fixed InvalidStateTransitonException at FINAL_SAVING state in
RMApp. (Rohith Sharmaks via jianhe)
+ YARN-3194. RM should handle NMContainerStatuses sent by NM while
+ registering if NM is Reconnected node (Rohith via jlowe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 61a0349..0de556b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -312,9 +312,12 @@ public class ResourceTrackerService extends AbstractService implements
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeReconnectEvent(nodeId, rmNode,
- request.getRunningApplications()));
+ this.rmContext
+ .getDispatcher()
+ .getEventHandler()
+ .handle(
+ new RMNodeReconnectEvent(nodeId, rmNode, request
+ .getRunningApplications(), request.getNMContainerStatuses()));
}
// On every node manager register we will be clearing NMToken keys if
// present for any running application.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 1bc98b2..9701775 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -601,6 +601,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
rmNode.httpAddress = newNode.getHttpAddress();
rmNode.totalCapability = newNode.getTotalCapability();
+ handleNMContainerStatus(reconnectEvent.getNMContainerStatuses(), rmNode);
+
// Reset heartbeat ID since node just restarted.
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
}
@@ -622,6 +624,26 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
+
+ private void handleNMContainerStatus(
+ List<NMContainerStatus> nmContainerStatuses, RMNodeImpl rmnode) {
+ List<ContainerStatus> containerStatuses =
+ new ArrayList<ContainerStatus>();
+ for (NMContainerStatus nmContainerStatus : nmContainerStatuses) {
+ containerStatuses.add(createContainerStatus(nmContainerStatus));
+ }
+ rmnode.handleContainerStatus(containerStatuses);
+ }
+
+ private ContainerStatus createContainerStatus(
+ NMContainerStatus remoteContainer) {
+ ContainerStatus cStatus =
+ ContainerStatus.newInstance(remoteContainer.getContainerId(),
+ remoteContainer.getContainerState(),
+ remoteContainer.getDiagnostics(),
+ remoteContainer.getContainerExitStatus());
+ return cStatus;
+ }
}
public static class UpdateNodeResourceWhenRunningTransition
@@ -747,49 +769,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
return NodeState.UNHEALTHY;
}
- // Filter the map to only obtain just launched containers and finished
- // containers.
- List<ContainerStatus> newlyLaunchedContainers =
- new ArrayList<ContainerStatus>();
- List<ContainerStatus> completedContainers =
- new ArrayList<ContainerStatus>();
- for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
- ContainerId containerId = remoteContainer.getContainerId();
-
- // Don't bother with containers already scheduled for cleanup, or for
- // applications already killed. The scheduler doens't need to know any
- // more about this container
- if (rmNode.containersToClean.contains(containerId)) {
- LOG.info("Container " + containerId + " already scheduled for " +
- "cleanup, no further processing");
- continue;
- }
- if (rmNode.finishedApplications.contains(containerId
- .getApplicationAttemptId().getApplicationId())) {
- LOG.info("Container " + containerId
- + " belongs to an application that is already killed,"
- + " no further processing");
- continue;
- }
+ rmNode.handleContainerStatus(statusEvent.getContainers());
- // Process running containers
- if (remoteContainer.getState() == ContainerState.RUNNING) {
- if (!rmNode.launchedContainers.contains(containerId)) {
- // Just launched container. RM knows about it the first time.
- rmNode.launchedContainers.add(containerId);
- newlyLaunchedContainers.add(remoteContainer);
- }
- } else {
- // A finished container
- rmNode.launchedContainers.remove(containerId);
- completedContainers.add(remoteContainer);
- }
- }
- if(newlyLaunchedContainers.size() != 0
- || completedContainers.size() != 0) {
- rmNode.nodeUpdateQueue.add(new UpdatedContainerInfo
- (newlyLaunchedContainers, completedContainers));
- }
if(rmNode.nextHeartBeat) {
rmNode.nextHeartBeat = false;
rmNode.context.getDispatcher().getEventHandler().handle(
@@ -874,4 +855,50 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
return nlm.getLabelsOnNode(nodeId);
}
+
+ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
+ // Filter the map to only obtain just launched containers and finished
+ // containers.
+ List<ContainerStatus> newlyLaunchedContainers =
+ new ArrayList<ContainerStatus>();
+ List<ContainerStatus> completedContainers =
+ new ArrayList<ContainerStatus>();
+ for (ContainerStatus remoteContainer : containerStatuses) {
+ ContainerId containerId = remoteContainer.getContainerId();
+
+ // Don't bother with containers already scheduled for cleanup, or for
+ // applications already killed. The scheduler doens't need to know any
+ // more about this container
+ if (containersToClean.contains(containerId)) {
+ LOG.info("Container " + containerId + " already scheduled for "
+ + "cleanup, no further processing");
+ continue;
+ }
+ if (finishedApplications.contains(containerId.getApplicationAttemptId()
+ .getApplicationId())) {
+ LOG.info("Container " + containerId
+ + " belongs to an application that is already killed,"
+ + " no further processing");
+ continue;
+ }
+
+ // Process running containers
+ if (remoteContainer.getState() == ContainerState.RUNNING) {
+ if (!launchedContainers.contains(containerId)) {
+ // Just launched container. RM knows about it the first time.
+ launchedContainers.add(containerId);
+ newlyLaunchedContainers.add(remoteContainer);
+ }
+ } else {
+ // A finished container
+ launchedContainers.remove(containerId);
+ completedContainers.add(remoteContainer);
+ }
+ }
+ if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
+ nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
+ completedContainers));
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
index ebbac9a..0bea44b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeReconnectEvent.java
@@ -22,16 +22,19 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
public class RMNodeReconnectEvent extends RMNodeEvent {
private RMNode reconnectedNode;
private List<ApplicationId> runningApplications;
+ private List<NMContainerStatus> containerStatuses;
public RMNodeReconnectEvent(NodeId nodeId, RMNode newNode,
- List<ApplicationId> runningApps) {
+ List<ApplicationId> runningApps, List<NMContainerStatus> containerReports) {
super(nodeId, RMNodeEventType.RECONNECTED);
reconnectedNode = newNode;
runningApplications = runningApps;
+ containerStatuses = containerReports;
}
public RMNode getReconnectedNode() {
@@ -41,4 +44,8 @@ public class RMNodeReconnectEvent extends RMNodeEvent {
public List<ApplicationId> getRunningApplications() {
return runningApplications;
}
+
+ public List<NMContainerStatus> getNMContainerStatuses() {
+ return containerStatuses;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index 891130f..6e08aeb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -28,7 +28,9 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.log4j.Level;
@@ -478,6 +481,124 @@ public class TestApplicationCleanup {
rm1.stop();
}
+ // The test verifies processing of NMContainerStatuses which are sent during
+ // NM registration.
+ // 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM
+ // 2. AM sends ResourceRequest for 1 container with memory 2048MB.
+ // 3. Verify for number of container allocated by RM
+ // 4. Verify Memory Usage by cluster, it should be 3072. AM memory + requested
+ // memory. 1024 + 2048=3072
+ // 5. Re-register NM by sending completed container status
+ // 6. Verify for Memory Used, it should be 1024
+ // 7. Send AM heatbeat to RM. Allocated response should contain completed
+ // container.
+ @Test(timeout = 60000)
+ public void testProcessingNMContainerStatusesOnNMRestart() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // 1. Start the cluster-RM,NM,Submit app with 1024MB,Launch & register AM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ int nmMemory = 8192;
+ int amMemory = 1024;
+ int containerMemory = 2048;
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", nmMemory, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ RMApp app0 = rm1.submitApp(amMemory);
+ MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
+
+ // 2. AM sends ResourceRequest for 1 container with memory 2048MB.
+ int noOfContainers = 1;
+ List<Container> allocateContainers =
+ am0.allocateAndWaitForContainers(noOfContainers, containerMemory, nm1);
+
+ // 3. Verify for number of container allocated by RM
+ Assert.assertEquals(noOfContainers, allocateContainers.size());
+ Container container = allocateContainers.get(0);
+
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.RUNNING);
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), container.getId()
+ .getContainerId(), ContainerState.RUNNING);
+
+ rm1.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
+
+ // 4. Verify Memory Usage by cluster, it should be 3072. AM memory +
+ // requested memory. 1024 + 2048=3072
+ ResourceScheduler rs = rm1.getRMContext().getScheduler();
+ int allocatedMB = rs.getRootQueueMetrics().getAllocatedMB();
+ Assert.assertEquals(amMemory + containerMemory, allocatedMB);
+
+ // 5. Re-register NM by sending completed container status
+ List<NMContainerStatus> nMContainerStatusForApp =
+ createNMContainerStatusForApp(am0);
+ nm1.registerNode(nMContainerStatusForApp,
+ Arrays.asList(app0.getApplicationId()));
+
+ waitForClusterMemory(nm1, rs, amMemory);
+
+ // 6. Verify for Memory Used, it should be 1024
+ Assert.assertEquals(amMemory, rs.getRootQueueMetrics().getAllocatedMB());
+
+ // 7. Send AM heatbeat to RM. Allocated response should contain completed
+ // container
+ AllocateRequest req =
+ AllocateRequest.newInstance(0, 0F, new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>(), null);
+ AllocateResponse allocate = am0.allocate(req);
+ List<ContainerStatus> completedContainersStatuses =
+ allocate.getCompletedContainersStatuses();
+ Assert.assertEquals(noOfContainers, completedContainersStatuses.size());
+
+ // Application clean up should happen Cluster memory used is 0
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ waitForClusterMemory(nm1, rs, 0);
+
+ rm1.stop();
+ }
+
+ private void waitForClusterMemory(MockNM nm1, ResourceScheduler rs,
+ int clusterMemory) throws Exception, InterruptedException {
+ int counter = 0;
+ while (rs.getRootQueueMetrics().getAllocatedMB() != clusterMemory) {
+ nm1.nodeHeartbeat(true);
+
+ Thread.sleep(100);
+ if (counter++ == 50) {
+ Assert.fail("Wait for cluster memory is timed out.Expected="
+ + clusterMemory + " Actual="
+ + rs.getRootQueueMetrics().getAllocatedMB());
+ }
+ }
+ }
+
+ public static List<NMContainerStatus> createNMContainerStatusForApp(MockAM am) {
+ List<NMContainerStatus> list = new ArrayList<NMContainerStatus>();
+ NMContainerStatus amContainer =
+ createNMContainerStatus(am.getApplicationAttemptId(), 1,
+ ContainerState.RUNNING, 1024);
+ NMContainerStatus completedContainer =
+ createNMContainerStatus(am.getApplicationAttemptId(), 2,
+ ContainerState.COMPLETE, 2048);
+ list.add(amContainer);
+ list.add(completedContainer);
+ return list;
+ }
+
+ public static NMContainerStatus createNMContainerStatus(
+ ApplicationAttemptId appAttemptId, int id, ContainerState containerState,
+ int memory) {
+ ContainerId containerId = ContainerId.newContainerId(appAttemptId, id);
+ NMContainerStatus containerReport =
+ NMContainerStatus.newInstance(containerId, containerState,
+ Resource.newInstance(memory, 1), "recover container", 0,
+ Priority.newInstance(0), 0);
+ return containerReport;
+ }
+
public static void main(String[] args) throws Exception {
TestApplicationCleanup t = new TestApplicationCleanup();
t.testAppCleanup();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a64dd3d2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index d877e25..c6da3fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -540,7 +540,7 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
- node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null));
+ node.handle(new RMNodeReconnectEvent(node.getNodeID(), node, null, null));
Assert.assertEquals("Active Nodes", initialActive, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
@@ -614,7 +614,7 @@ public class TestRMNodeTransitions {
Assert.assertEquals(nmVersion1, node.getNodeManagerVersion());
RMNodeImpl reconnectingNode = getRunningNode(nmVersion2);
node.handle(new RMNodeReconnectEvent(node.getNodeID(), reconnectingNode,
- null));
+ null, null));
Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
}
}