You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/07/31 23:53:43 UTC

[03/10] hadoop git commit: YARN-433. When RM is catching up with node updates then it should not expire acquired containers. Contributed by Xuan Gong

YARN-433. When RM is catching up with node updates then it should not expire acquired containers. Contributed by Xuan Gong


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab80e277
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab80e277
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab80e277

Branch: refs/heads/HADOOP-12111
Commit: ab80e277039a586f6d6259b2511ac413e29ea4f8
Parents: 2087eaf
Author: Zhihai Xu <zx...@apache.org>
Authored: Thu Jul 30 21:56:25 2015 -0700
Committer: Zhihai Xu <zx...@apache.org>
Committed: Thu Jul 30 21:57:11 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../rmcontainer/RMContainerImpl.java            | 27 +--------
 .../resourcemanager/rmnode/RMNodeImpl.java      |  8 +++
 .../resourcemanager/TestRMNodeTransitions.java  | 62 ++++++++++++++++----
 4 files changed, 63 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab80e277/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f027c29..1da2dbc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -716,6 +716,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3971. Skip RMNodeLabelsManager#checkRemoveFromClusterNodeLabelsOfQueue 
     on nodelabel recovery. (Bibin A Chundatt via wangda)
 
+    YARN-433. When RM is catching up with node updates then it should not expire
+    acquired containers. (Xuan Gong via zxu)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab80e277/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index f7d3f56..940f76f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -99,9 +99,9 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
 
     // Transitions from ACQUIRED state
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
-        RMContainerEventType.LAUNCHED, new LaunchedTransition())
+        RMContainerEventType.LAUNCHED)
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
-        RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState())
+        RMContainerEventType.FINISHED, new FinishedTransition())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
         RMContainerEventType.RELEASED, new KillTransition())
     .addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED,
@@ -486,16 +486,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     }
   }
 
-  private static final class LaunchedTransition extends BaseTransition {
-
-    @Override
-    public void transition(RMContainerImpl container, RMContainerEvent event) {
-      // Unregister from containerAllocationExpirer.
-      container.containerAllocationExpirer.unregister(container
-          .getContainerId());
-    }
-  }
-
   private static final class ContainerRescheduledTransition extends
       FinishedTransition {
 
@@ -554,19 +544,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
     }
   }
 
-  private static final class ContainerFinishedAtAcquiredState extends
-      FinishedTransition {
-    @Override
-    public void transition(RMContainerImpl container, RMContainerEvent event) {
-      // Unregister from containerAllocationExpirer.
-      container.containerAllocationExpirer.unregister(container
-          .getContainerId());
-
-      // Inform AppAttempt
-      super.transition(container, event);
-    }
-  }
-
   private static final class KillTransition extends FinishedTransition {
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab80e277/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 09b9278..f182d02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -107,6 +108,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   private long lastHealthReportTime;
   private String nodeManagerVersion;
 
+  private final ContainerAllocationExpirer containerAllocationExpirer;
   /* set of containers that have just launched */
   private final Set<ContainerId> launchedContainers =
     new HashSet<ContainerId>();
@@ -265,6 +267,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     this.stateMachine = stateMachineFactory.make(this);
     
     this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
+
+    this.containerAllocationExpirer = context.getContainerAllocationExpirer();
   }
 
   @Override
@@ -953,11 +957,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
           // Just launched container. RM knows about it the first time.
           launchedContainers.add(containerId);
           newlyLaunchedContainers.add(remoteContainer);
+          // Unregister from containerAllocationExpirer.
+          containerAllocationExpirer.unregister(containerId);
         }
       } else {
         // A finished container
         launchedContainers.remove(containerId);
         completedContainers.add(remoteContainer);
+        // Unregister from containerAllocationExpirer.
+        containerAllocationExpirer.unregister(containerId);
       }
     }
     if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab80e277/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index ece896b..4964c59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -31,6 +31,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -105,8 +107,9 @@ public class TestRMNodeTransitions {
     InlineDispatcher rmDispatcher = new InlineDispatcher();
     
     rmContext =
-        new RMContextImpl(rmDispatcher, null, null, null,
-            mock(DelegationTokenRenewer.class), null, null, null, null, null);
+        new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class),
+          null, null, mock(DelegationTokenRenewer.class), null, null, null,
+          null, null);
     NodesListManager nodesListManager = mock(NodesListManager.class);
     HostsFileReader reader = mock(HostsFileReader.class);
     when(nodesListManager.getHostsReader()).thenReturn(reader);
@@ -147,7 +150,8 @@ public class TestRMNodeTransitions {
   public void tearDown() throws Exception {
   }
   
-  private RMNodeStatusEvent getMockRMNodeStatusEvent() {
+  private RMNodeStatusEvent getMockRMNodeStatusEvent(
+      List<ContainerStatus> containerStatus) {
     NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
 
     NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
@@ -158,6 +162,9 @@ public class TestRMNodeTransitions {
     doReturn(healthStatus).when(event).getNodeHealthStatus();
     doReturn(response).when(event).getLatestResponse();
     doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+    if (containerStatus != null) {
+      doReturn(containerStatus).when(event).getContainers();
+    }
     return event;
   }
   
@@ -176,7 +183,7 @@ public class TestRMNodeTransitions {
     
     // Now verify that scheduler isn't notified of an expired container
     // by checking number of 'completedContainers' it got in the previous event
-    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
     ContainerStatus containerStatus = mock(ContainerStatus.class);
     doReturn(completedContainerId).when(containerStatus).getContainerId();
     doReturn(Collections.singletonList(containerStatus)).
@@ -207,11 +214,11 @@ public class TestRMNodeTransitions {
     ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
         BuilderUtils.newApplicationAttemptId(
             BuilderUtils.newApplicationId(1, 1), 1), 2);
- 
-    RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent();
-    RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent();
-    RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent();
-    
+
+    RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
+    RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
+    RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null);
+
     ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
     ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
     ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
@@ -263,8 +270,8 @@ public class TestRMNodeTransitions {
         BuilderUtils.newApplicationAttemptId(
             BuilderUtils.newApplicationId(1, 1), 1), 1);
         
-    RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent();
-    RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);
+    RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null);
 
     ContainerStatus containerStatus1 = mock(ContainerStatus.class);
     ContainerStatus containerStatus2 = mock(ContainerStatus.class);
@@ -499,7 +506,7 @@ public class TestRMNodeTransitions {
 
     // Verify status update does not clear containers/apps to cleanup
     // but updating heartbeat response for cleanup does
-    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
     node.handle(statusEvent);
     Assert.assertEquals(1, node.getContainersToCleanUp().size());
     Assert.assertEquals(1, node.getAppsToCleanup().size());
@@ -706,4 +713,35 @@ public class TestRMNodeTransitions {
         null, null));
     Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
   }
+
+  @Test
+  public void testContainerExpire() throws Exception {
+    ContainerAllocationExpirer mockExpirer =
+        mock(ContainerAllocationExpirer.class);
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
+    ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
+    mockExpirer.register(containerId1);
+    mockExpirer.register(containerId2);
+    verify(mockExpirer).register(containerId1);
+    verify(mockExpirer).register(containerId2);
+    ((RMContextImpl) rmContext).setContainerAllocationExpirer(mockExpirer);
+    RMNodeImpl rmNode = getRunningNode();
+    ContainerStatus status1 =
+        ContainerStatus
+          .newInstance(containerId1, ContainerState.RUNNING, "", 0);
+    ContainerStatus status2 =
+        ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
+          0);
+    List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+    statusList.add(status1);
+    statusList.add(status2);
+    RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(statusList);
+    rmNode.handle(statusEvent);
+    verify(mockExpirer).unregister(containerId1);
+    verify(mockExpirer).unregister(containerId2);
+  }
 }