You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2016/06/13 09:27:58 UTC
[38/51] [abbrv] hadoop git commit: YARN-5197. RM leaks containers if
running container disappears from node update. Contributed by Jason Lowe.
YARN-5197. RM leaks containers if running container disappears from node update. Contributed by Jason Lowe.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0f4620c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0f4620c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0f4620c
Branch: refs/heads/YARN-3926
Commit: e0f4620cc7db3db4b781e6042ab7dd754af28f18
Parents: 8a1dcce
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Sat Jun 11 10:22:27 2016 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Sat Jun 11 10:22:27 2016 +0530
----------------------------------------------------------------------
.../resourcemanager/rmnode/RMNodeImpl.java | 37 +++++++++++++
.../yarn/server/resourcemanager/MockNM.java | 57 ++++++++++++++------
.../resourcemanager/TestRMNodeTransitions.java | 44 +++++++++++++++
3 files changed, 121 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f4620c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 4b65675..a3a6b30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1311,6 +1313,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers =
new ArrayList<ContainerStatus>();
+ int numRemoteRunningContainers = 0;
for (ContainerStatus remoteContainer : containerStatuses) {
ContainerId containerId = remoteContainer.getContainerId();
@@ -1344,6 +1347,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
if (remoteContainer.getState() == ContainerState.RUNNING) {
// Process only GUARANTEED containers in the RM.
if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
+ ++numRemoteRunningContainers;
if (!launchedContainers.contains(containerId)) {
// Just launched container. RM knows about it the first time.
launchedContainers.add(containerId);
@@ -1366,12 +1370,45 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
completedContainers.add(remoteContainer);
}
}
+ completedContainers.addAll(findLostContainers(
+ numRemoteRunningContainers, containerStatuses));
+
if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
completedContainers));
}
}
+ private List<ContainerStatus> findLostContainers(int numRemoteRunning,
+ List<ContainerStatus> containerStatuses) {
+ if (numRemoteRunning >= launchedContainers.size()) {
+ return Collections.emptyList();
+ }
+ Set<ContainerId> nodeContainers =
+ new HashSet<ContainerId>(numRemoteRunning);
+ List<ContainerStatus> lostContainers = new ArrayList<ContainerStatus>(
+ launchedContainers.size() - numRemoteRunning);
+ for (ContainerStatus remoteContainer : containerStatuses) {
+ if (remoteContainer.getState() == ContainerState.RUNNING
+ && remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
+ nodeContainers.add(remoteContainer.getContainerId());
+ }
+ }
+ Iterator<ContainerId> iter = launchedContainers.iterator();
+ while (iter.hasNext()) {
+ ContainerId containerId = iter.next();
+ if (!nodeContainers.contains(containerId)) {
+ String diag = "Container " + containerId
+ + " was running but not reported from " + nodeId;
+ LOG.warn(diag);
+ lostContainers.add(SchedulerUtils.createAbnormalContainerStatus(
+ containerId, diag));
+ iter.remove();
+ }
+ }
+ return lostContainers;
+ }
+
private void handleLogAggregationStatus(
List<LogAggregationReport> logAggregationReportsForApps) {
for (LogAggregationReport report : logAggregationReportsForApps) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f4620c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 04ea51c..2e2bef7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -28,6 +28,7 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -57,6 +58,8 @@ public class MockNM {
private MasterKey currentContainerTokenMasterKey;
private MasterKey currentNMTokenMasterKey;
private String version;
+ private Map<ContainerId, ContainerStatus> containerStats =
+ new HashMap<ContainerId, ContainerStatus>();
public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
// scale vcores based on the requested memory
@@ -106,14 +109,12 @@ public class MockNM {
}
public void containerIncreaseStatus(Container container) throws Exception {
- Map<ApplicationId, List<ContainerStatus>> conts = new HashMap<>();
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
container.getId(), ContainerState.RUNNING, "Success", 0,
container.getResource());
- conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
- Collections.singletonList(containerStatus));
List<Container> increasedConts = Collections.singletonList(container);
- nodeHeartbeat(conts, increasedConts, true, ++responseId);
+ nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
+ true, ++responseId);
}
public RegisterNodeManagerResponse registerNode() throws Exception {
@@ -147,18 +148,27 @@ public class MockNM {
memory = (int) newResource.getMemorySize();
vCores = newResource.getVirtualCores();
}
+ containerStats.clear();
+ if (containerReports != null) {
+ for (NMContainerStatus report : containerReports) {
+ if (report.getContainerState() != ContainerState.COMPLETE) {
+ containerStats.put(report.getContainerId(),
+ ContainerStatus.newInstance(report.getContainerId(),
+ report.getContainerState(), report.getDiagnostics(),
+ report.getContainerExitStatus()));
+ }
+ }
+ }
return registrationResponse;
}
public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
- return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
- isHealthy, ++responseId);
+ return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
+ Collections.<Container>emptyList(), isHealthy, ++responseId);
}
public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
long containerId, ContainerState containerState) throws Exception {
- HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
- new HashMap<ApplicationId, List<ContainerStatus>>(1);
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(attemptId, containerId), containerState,
"Success", 0, BuilderUtils.newResource(memory, vCores));
@@ -166,8 +176,8 @@ public class MockNM {
new ArrayList<ContainerStatus>(1);
containerStatusList.add(containerStatus);
Log.info("ContainerStatus: " + containerStatus);
- nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
- return nodeHeartbeat(nodeUpdate, true);
+ return nodeHeartbeat(containerStatusList,
+ Collections.<Container>emptyList(), true, ++responseId);
}
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@@ -177,19 +187,32 @@ public class MockNM {
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
- return nodeHeartbeat(conts, new ArrayList<Container>(), isHealthy, resId);
+ ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
+ for (List<ContainerStatus> stats : conts.values()) {
+ updatedStats.addAll(stats);
+ }
+ return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
+ isHealthy, resId);
}
- public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
- List<ContainerStatus>> conts, List<Container> increasedConts,
- boolean isHealthy, int resId) throws Exception {
+ public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
+ List<Container> increasedConts, boolean isHealthy, int resId)
+ throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setResponseId(resId);
status.setNodeId(nodeId);
- for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
- Log.info("entry.getValue() " + entry.getValue());
- status.setContainersStatuses(entry.getValue());
+ ArrayList<ContainerId> completedContainers = new ArrayList<ContainerId>();
+ for (ContainerStatus stat : updatedStats) {
+ if (stat.getState() == ContainerState.COMPLETE) {
+ completedContainers.add(stat.getContainerId());
+ }
+ containerStats.put(stat.getContainerId(), stat);
+ }
+ status.setContainersStatuses(
+ new ArrayList<ContainerStatus>(containerStats.values()));
+ for (ContainerId cid : completedContainers) {
+ containerStats.remove(cid);
}
status.setIncreasedContainers(increasedConts);
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f4620c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 16fe998..83a7c73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -34,6 +34,7 @@ import java.util.Random;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -1021,4 +1022,47 @@ public class TestRMNodeTransitions {
Resource originalCapacity = node.getOriginalTotalCapability();
assertEquals("Original total capability not null after recommission", null, originalCapacity);
}
+
+ @Test
+ public void testDisappearingContainer() {
+ ContainerId cid1 = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(1, 1), 1), 1);
+ ContainerId cid2 = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(2, 2), 2), 2);
+ ArrayList<ContainerStatus> containerStats =
+ new ArrayList<ContainerStatus>();
+ containerStats.add(ContainerStatus.newInstance(cid1,
+ ContainerState.RUNNING, "", -1));
+ containerStats.add(ContainerStatus.newInstance(cid2,
+ ContainerState.RUNNING, "", -1));
+ node = getRunningNode();
+ node.handle(getMockRMNodeStatusEvent(containerStats));
+ assertEquals("unexpected number of running containers",
+ 2, node.getLaunchedContainers().size());
+ Assert.assertTrue("first container not running",
+ node.getLaunchedContainers().contains(cid1));
+ Assert.assertTrue("second container not running",
+ node.getLaunchedContainers().contains(cid2));
+ assertEquals("already completed containers",
+ 0, completedContainers.size());
+ containerStats.remove(0);
+ node.handle(getMockRMNodeStatusEvent(containerStats));
+ assertEquals("expected one container to be completed",
+ 1, completedContainers.size());
+ ContainerStatus cs = completedContainers.get(0);
+ assertEquals("first container not the one that completed",
+ cid1, cs.getContainerId());
+ assertEquals("completed container not marked complete",
+ ContainerState.COMPLETE, cs.getState());
+ assertEquals("completed container not marked aborted",
+ ContainerExitStatus.ABORTED, cs.getExitStatus());
+ Assert.assertTrue("completed container not marked missing",
+ cs.getDiagnostics().contains("not reported"));
+ assertEquals("unexpected number of running containers",
+ 1, node.getLaunchedContainers().size());
+ Assert.assertTrue("second container not running",
+ node.getLaunchedContainers().contains(cid2));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org