You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/13 21:19:29 UTC

[16/25] hadoop git commit: YARN-5197. RM leaks containers if running container disappears from node update. Contributed by Jason Lowe.

YARN-5197. RM leaks containers if running container disappears from node update. Contributed by Jason Lowe.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0f4620c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0f4620c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0f4620c

Branch: refs/heads/HDFS-1312
Commit: e0f4620cc7db3db4b781e6042ab7dd754af28f18
Parents: 8a1dcce
Author: Rohith Sharma K S <ro...@apache.org>
Authored: Sat Jun 11 10:22:27 2016 +0530
Committer: Rohith Sharma K S <ro...@apache.org>
Committed: Sat Jun 11 10:22:27 2016 +0530

----------------------------------------------------------------------
 .../resourcemanager/rmnode/RMNodeImpl.java      | 37 +++++++++++++
 .../yarn/server/resourcemanager/MockNM.java     | 57 ++++++++++++++------
 .../resourcemanager/TestRMNodeTransitions.java  | 44 +++++++++++++++
 3 files changed, 121 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f4620c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 4b65675..a3a6b30 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.AllocationExpirationInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1311,6 +1313,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         new ArrayList<ContainerStatus>();
     List<ContainerStatus> completedContainers =
         new ArrayList<ContainerStatus>();
+    int numRemoteRunningContainers = 0;
     for (ContainerStatus remoteContainer : containerStatuses) {
       ContainerId containerId = remoteContainer.getContainerId();
 
@@ -1344,6 +1347,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
       if (remoteContainer.getState() == ContainerState.RUNNING) {
         // Process only GUARANTEED containers in the RM.
         if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
+          ++numRemoteRunningContainers;
           if (!launchedContainers.contains(containerId)) {
             // Just launched container. RM knows about it the first time.
             launchedContainers.add(containerId);
@@ -1366,12 +1370,45 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         completedContainers.add(remoteContainer);
       }
     }
+    completedContainers.addAll(findLostContainers(
+          numRemoteRunningContainers, containerStatuses));
+
     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
       nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
           completedContainers));
     }
   }
 
+  private List<ContainerStatus> findLostContainers(int numRemoteRunning,
+      List<ContainerStatus> containerStatuses) {
+    if (numRemoteRunning >= launchedContainers.size()) {
+      return Collections.emptyList();
+    }
+    Set<ContainerId> nodeContainers =
+        new HashSet<ContainerId>(numRemoteRunning);
+    List<ContainerStatus> lostContainers = new ArrayList<ContainerStatus>(
+        launchedContainers.size() - numRemoteRunning);
+    for (ContainerStatus remoteContainer : containerStatuses) {
+      if (remoteContainer.getState() == ContainerState.RUNNING
+          && remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) {
+        nodeContainers.add(remoteContainer.getContainerId());
+      }
+    }
+    Iterator<ContainerId> iter = launchedContainers.iterator();
+    while (iter.hasNext()) {
+      ContainerId containerId = iter.next();
+      if (!nodeContainers.contains(containerId)) {
+        String diag = "Container " + containerId
+            + " was running but not reported from " + nodeId;
+        LOG.warn(diag);
+        lostContainers.add(SchedulerUtils.createAbnormalContainerStatus(
+            containerId, diag));
+        iter.remove();
+      }
+    }
+    return lostContainers;
+  }
+
   private void handleLogAggregationStatus(
       List<LogAggregationReport> logAggregationReportsForApps) {
     for (LogAggregationReport report : logAggregationReportsForApps) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f4620c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 04ea51c..2e2bef7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -57,6 +58,8 @@ public class MockNM {
   private MasterKey currentContainerTokenMasterKey;
   private MasterKey currentNMTokenMasterKey;
   private String version;
+  private Map<ContainerId, ContainerStatus> containerStats =
+      new HashMap<ContainerId, ContainerStatus>();
 
   public MockNM(String nodeIdStr, int memory, ResourceTrackerService resourceTracker) {
     // scale vcores based on the requested memory
@@ -106,14 +109,12 @@ public class MockNM {
   }
 
   public void containerIncreaseStatus(Container container) throws Exception {
-    Map<ApplicationId, List<ContainerStatus>> conts = new HashMap<>();
     ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
         container.getId(), ContainerState.RUNNING, "Success", 0,
             container.getResource());
-    conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
-        Collections.singletonList(containerStatus));
     List<Container> increasedConts = Collections.singletonList(container);
-    nodeHeartbeat(conts, increasedConts, true, ++responseId);
+    nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
+        true, ++responseId);
   }
 
   public RegisterNodeManagerResponse registerNode() throws Exception {
@@ -147,18 +148,27 @@ public class MockNM {
       memory = (int) newResource.getMemorySize();
       vCores = newResource.getVirtualCores();
     }
+    containerStats.clear();
+    if (containerReports != null) {
+      for (NMContainerStatus report : containerReports) {
+        if (report.getContainerState() != ContainerState.COMPLETE) {
+          containerStats.put(report.getContainerId(),
+              ContainerStatus.newInstance(report.getContainerId(),
+                  report.getContainerState(), report.getDiagnostics(),
+                  report.getContainerExitStatus()));
+        }
+      }
+    }
     return registrationResponse;
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
-    return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
-        isHealthy, ++responseId);
+    return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
+        Collections.<Container>emptyList(), isHealthy, ++responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
       long containerId, ContainerState containerState) throws Exception {
-    HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
-        new HashMap<ApplicationId, List<ContainerStatus>>(1);
     ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
         BuilderUtils.newContainerId(attemptId, containerId), containerState,
         "Success", 0, BuilderUtils.newResource(memory, vCores));
@@ -166,8 +176,8 @@ public class MockNM {
         new ArrayList<ContainerStatus>(1);
     containerStatusList.add(containerStatus);
     Log.info("ContainerStatus: " + containerStatus);
-    nodeUpdate.put(attemptId.getApplicationId(), containerStatusList);
-    return nodeHeartbeat(nodeUpdate, true);
+    return nodeHeartbeat(containerStatusList,
+        Collections.<Container>emptyList(), true, ++responseId);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@@ -177,19 +187,32 @@ public class MockNM {
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
       List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
-    return nodeHeartbeat(conts, new ArrayList<Container>(), isHealthy, resId);
+    ArrayList<ContainerStatus> updatedStats = new ArrayList<ContainerStatus>();
+    for (List<ContainerStatus> stats : conts.values()) {
+      updatedStats.addAll(stats);
+    }
+    return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
+        isHealthy, resId);
   }
 
-  public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
-      List<ContainerStatus>> conts, List<Container> increasedConts,
-          boolean isHealthy, int resId) throws Exception {
+  public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats,
+      List<Container> increasedConts, boolean isHealthy, int resId)
+          throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setResponseId(resId);
     status.setNodeId(nodeId);
-    for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
-      Log.info("entry.getValue() " + entry.getValue());
-      status.setContainersStatuses(entry.getValue());
+    ArrayList<ContainerId> completedContainers = new ArrayList<ContainerId>();
+    for (ContainerStatus stat : updatedStats) {
+      if (stat.getState() == ContainerState.COMPLETE) {
+        completedContainers.add(stat.getContainerId());
+      }
+      containerStats.put(stat.getContainerId(), stat);
+    }
+    status.setContainersStatuses(
+        new ArrayList<ContainerStatus>(containerStats.values()));
+    for (ContainerId cid : completedContainers) {
+      containerStats.remove(cid);
     }
     status.setIncreasedContainers(increasedConts);
     NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0f4620c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 16fe998..83a7c73 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -34,6 +34,7 @@ import java.util.Random;
 import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -1021,4 +1022,47 @@ public class TestRMNodeTransitions {
     Resource originalCapacity = node.getOriginalTotalCapability();
     assertEquals("Original total capability not null after recommission", null, originalCapacity);
   }
+
+  @Test
+  public void testDisappearingContainer() {
+    ContainerId cid1 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(1, 1), 1), 1);
+    ContainerId cid2 = BuilderUtils.newContainerId(
+        BuilderUtils.newApplicationAttemptId(
+            BuilderUtils.newApplicationId(2, 2), 2), 2);
+    ArrayList<ContainerStatus> containerStats =
+        new ArrayList<ContainerStatus>();
+    containerStats.add(ContainerStatus.newInstance(cid1,
+        ContainerState.RUNNING, "", -1));
+    containerStats.add(ContainerStatus.newInstance(cid2,
+        ContainerState.RUNNING, "", -1));
+    node = getRunningNode();
+    node.handle(getMockRMNodeStatusEvent(containerStats));
+    assertEquals("unexpected number of running containers",
+        2, node.getLaunchedContainers().size());
+    Assert.assertTrue("first container not running",
+        node.getLaunchedContainers().contains(cid1));
+    Assert.assertTrue("second container not running",
+        node.getLaunchedContainers().contains(cid2));
+    assertEquals("already completed containers",
+        0, completedContainers.size());
+    containerStats.remove(0);
+    node.handle(getMockRMNodeStatusEvent(containerStats));
+    assertEquals("expected one container to be completed",
+        1, completedContainers.size());
+    ContainerStatus cs = completedContainers.get(0);
+    assertEquals("first container not the one that completed",
+        cid1, cs.getContainerId());
+    assertEquals("completed container not marked complete",
+        ContainerState.COMPLETE, cs.getState());
+    assertEquals("completed container not marked aborted",
+        ContainerExitStatus.ABORTED, cs.getExitStatus());
+    Assert.assertTrue("completed container not marked missing",
+        cs.getDiagnostics().contains("not reported"));
+    assertEquals("unexpected number of running containers",
+        1, node.getLaunchedContainers().size());
+    Assert.assertTrue("second container not running",
+        node.getLaunchedContainers().contains(cid2));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org