You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/03 18:57:40 UTC

svn commit: r1464106 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-...

Author: vinodkv
Date: Wed Apr  3 16:57:40 2013
New Revision: 1464106

URL: http://svn.apache.org/r1464106
Log:
YARN-101. Fix NodeManager heartbeat processing to not lose track of completed containers in case of dropped heartbeats. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1464105 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1464106&r1=1464105&r2=1464106&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Apr  3 16:57:40 2013
@@ -125,6 +125,9 @@ Release 2.0.5-beta - UNRELEASED
     local directory hits unix file count limits and thus prevent job failures.
     (Omkar Vinit Joshi via vinodkv)
 
+    YARN-101. Fix NodeManager heartbeat processing to not lose track of completed
+    containers in case of dropped heartbeats. (Xuan Gong via vinodkv)
+
 Release 2.0.4-alpha - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1464106&r1=1464105&r2=1464106&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed Apr  3 16:57:40 2013
@@ -119,6 +119,10 @@ public class NodeManager extends Composi
     return new DeletionService(exec);
   }
 
+  protected NMContext createNMContext(NMContainerTokenSecretManager containerTokenSecretManager) {
+    return new NMContext(containerTokenSecretManager);
+  }
+
   protected void doSecureLogin() throws IOException {
     SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB,
         YarnConfiguration.NM_PRINCIPAL);
@@ -137,7 +141,7 @@ public class NodeManager extends Composi
       containerTokenSecretManager = new NMContainerTokenSecretManager(conf);
     }
 
-    this.context = new NMContext(containerTokenSecretManager);
+    this.context = createNMContext(containerTokenSecretManager);
 
     this.aclsManager = new ApplicationACLsManager(conf);
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1464106&r1=1464105&r2=1464106&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Apr  3 16:57:40 2013
@@ -88,6 +88,10 @@ public class NodeStatusUpdaterImpl exten
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
 
+  private boolean previousHeartBeatSucceeded;
+  private List<ContainerStatus> previousContainersStatuses =
+      new ArrayList<ContainerStatus>();
+
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
     super(NodeStatusUpdaterImpl.class.getName());
@@ -95,6 +99,7 @@ public class NodeStatusUpdaterImpl exten
     this.context = context;
     this.dispatcher = dispatcher;
     this.metrics = metrics;
+    this.previousHeartBeatSucceeded = true;
   }
 
   @Override
@@ -314,8 +319,14 @@ public class NodeStatusUpdaterImpl exten
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
     nodeStatus.setNodeId(this.nodeId);
 
-    int numActiveContainers = 0;
     List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
+    if(previousHeartBeatSucceeded) {
+      previousContainersStatuses.clear();
+    } else {
+      containersStatuses.addAll(previousContainersStatuses);
+    }
+
+    int numActiveContainers = 0;
     for (Iterator<Entry<ContainerId, Container>> i =
         this.context.getContainers().entrySet().iterator(); i.hasNext();) {
       Entry<ContainerId, Container> e = i.next();
@@ -330,6 +341,7 @@ public class NodeStatusUpdaterImpl exten
       LOG.info("Sending out status for container: " + containerStatus);
 
       if (containerStatus.getState() == ContainerState.COMPLETE) {
+        previousContainersStatuses.add(containerStatus);
         // Remove
         i.remove();
 
@@ -404,6 +416,7 @@ public class NodeStatusUpdaterImpl exten
             }
             NodeHeartbeatResponse response =
               resourceTracker.nodeHeartbeat(request);
+            previousHeartBeatSucceeded = true;
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             // See if the master-key has rolled over
@@ -449,6 +462,7 @@ public class NodeStatusUpdaterImpl exten
                   new CMgrCompletedAppsEvent(appsToCleanup));
             }
           } catch (Throwable e) {
+            previousHeartBeatSucceeded = false;
             // TODO Better error handling. Thread can die with the rest of the
             // NM still running.
             LOG.error("Caught exception in status-updater", e);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1464106&r1=1464105&r2=1464106&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Apr  3 16:57:40 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -29,6 +30,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -58,11 +61,13 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.service.Service;
@@ -92,6 +97,8 @@ public class TestNodeStatusUpdater {
   private final Configuration conf = createNMConfig();
   private NodeManager nm;
   protected NodeManager rebootedNodeManager;
+  private boolean containerStatusBackupSuccessfully = true;
+  private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
 
   @After
   public void tearDown() {
@@ -237,6 +244,22 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
+    public ResourceTracker resourceTracker;
+
+    public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher,
+        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+      super(context, dispatcher, healthChecker, metrics);
+      resourceTracker = new MyResourceTracker4(context);
+    }
+
+    @Override
+    protected ResourceTracker getRMClient() {
+      return resourceTracker;
+    }
+
+  }
+
   private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
     public ResourceTracker resourceTracker;
     private Context context;
@@ -384,6 +407,104 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  private class MyResourceTracker4 implements ResourceTracker {
+
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
+    public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+    private Context context;
+
+    public MyResourceTracker4(Context context) {
+      this.context = context;
+    }
+
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setNodeAction(registerNodeAction);
+      return response;
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      try {
+        if (heartBeatID == 0) {
+          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+              .size(), 0);
+          Assert.assertEquals(context.getContainers().size(), 0);
+        } else if (heartBeatID == 1) {
+          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+              .size(), 5);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(0).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(0)
+                  .getContainerId().getId() == 1);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(1).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(1)
+                  .getContainerId().getId() == 2);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(2).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(2)
+                  .getContainerId().getId() == 3);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(3).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(3)
+                  .getContainerId().getId() == 4);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(4).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(4)
+                  .getContainerId().getId() == 5);
+          throw new YarnException("Lost the heartbeat response");
+        } else if (heartBeatID == 2) {
+          Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
+              .size(), 7);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(0).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(0)
+                  .getContainerId().getId() == 3);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(1).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(1)
+                  .getContainerId().getId() == 4);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(2).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(2)
+                  .getContainerId().getId() == 1);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(3).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(3)
+                  .getContainerId().getId() == 2);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(4).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(4)
+                  .getContainerId().getId() == 5);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(5).getState() == ContainerState.RUNNING
+              && request.getNodeStatus().getContainersStatuses().get(5)
+                  .getContainerId().getId() == 6);
+          Assert.assertTrue(request.getNodeStatus().getContainersStatuses()
+              .get(6).getState() == ContainerState.COMPLETE
+              && request.getNodeStatus().getContainersStatuses().get(6)
+                  .getContainerId().getId() == 7);
+        }
+      } catch (AssertionError error) {
+        LOG.info(error);
+        containerStatusBackupSuccessfully = false;
+      } finally {
+        heartBeatID++;
+      }
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartBeatID);
+      NodeHeartbeatResponse nhResponse =
+          YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
+              heartBeatNodeAction, null, null, null, 1000L);
+      return nhResponse;
+    }
+  }
+
   @Before
   public void clearError() {
     nmStartError = null;
@@ -725,6 +846,127 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  /**
+   * Test completed containerStatus get back up when heart beat lost
+   */
+  @Test(timeout = 20000)
+  public void testCompletedContainerStatusBackup() throws Exception {
+    nm = new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+        MyNodeStatusUpdater2 myNodeStatusUpdater =
+            new MyNodeStatusUpdater2(context, dispatcher, healthChecker,
+                metrics);
+        return myNodeStatusUpdater;
+      }
+
+      @Override
+      protected NMContext createNMContext(
+          NMContainerTokenSecretManager containerTokenSecretManager) {
+        return new MyNMContext(containerTokenSecretManager);
+      }
+
+    };
+
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+
+    int waitCount = 0;
+    while (heartBeatID <= 3 && waitCount++ != 20) {
+      Thread.sleep(500);
+    }
+    if(!containerStatusBackupSuccessfully) {
+      Assert.fail("ContainerStatus Backup failed");
+    }
+    nm.stop();
+  }
+
+  private class MyNMContext extends NMContext {
+    ConcurrentMap<ContainerId, Container> containers =
+        new ConcurrentSkipListMap<ContainerId, Container>();
+
+    public MyNMContext(NMContainerTokenSecretManager
+        containerTokenSecretManager) {
+      super(containerTokenSecretManager);
+    }
+
+    @Override
+    public ConcurrentMap<ContainerId, Container> getContainers() {
+      if (heartBeatID == 0) {
+        return containers;
+      } else if (heartBeatID == 1) {
+        ContainerStatus containerStatus1 =
+            createContainerStatus(1, ContainerState.RUNNING);
+        Container container1 = getMockContainer(containerStatus1);
+        containers.put(containerStatus1.getContainerId(), container1);
+
+        ContainerStatus containerStatus2 =
+            createContainerStatus(2, ContainerState.RUNNING);
+        Container container2 = getMockContainer(containerStatus2);
+        containers.put(containerStatus2.getContainerId(), container2);
+
+        ContainerStatus containerStatus3 =
+            createContainerStatus(3, ContainerState.COMPLETE);
+        Container container3 = getMockContainer(containerStatus3);
+        containers.put(containerStatus3.getContainerId(), container3);
+        completedContainerStatusList.add(containerStatus3);
+
+        ContainerStatus containerStatus4 =
+            createContainerStatus(4, ContainerState.COMPLETE);
+        Container container4 = getMockContainer(containerStatus4);
+        containers.put(containerStatus4.getContainerId(), container4);
+        completedContainerStatusList.add(containerStatus4);
+
+        ContainerStatus containerStatus5 =
+            createContainerStatus(5, ContainerState.RUNNING);
+        Container container5 = getMockContainer(containerStatus5);
+        containers.put(containerStatus5.getContainerId(), container5);
+
+        return containers;
+      } else if (heartBeatID == 2) {
+        ContainerStatus containerStatus6 =
+            createContainerStatus(6, ContainerState.RUNNING);
+        Container container6 = getMockContainer(containerStatus6);
+        containers.put(containerStatus6.getContainerId(), container6);
+
+        ContainerStatus containerStatus7 =
+            createContainerStatus(7, ContainerState.COMPLETE);
+        Container container7 = getMockContainer(containerStatus7);
+        containers.put(containerStatus7.getContainerId(), container7);
+        completedContainerStatusList.add(containerStatus7);
+
+        return containers;
+      } else {
+        containers.clear();
+
+        return containers;
+      }
+    }
+
+    private ContainerStatus createContainerStatus(int id,
+        ContainerState containerState) {
+      ApplicationId applicationId =
+          BuilderUtils.newApplicationId(System.currentTimeMillis(), id);
+      ApplicationAttemptId applicationAttemptId =
+          BuilderUtils.newApplicationAttemptId(applicationId, id);
+      ContainerId contaierId =
+          BuilderUtils.newContainerId(applicationAttemptId, id);
+      ContainerStatus containerStatus =
+          BuilderUtils.newContainerStatus(contaierId, containerState,
+              "test_containerStatus: id=" + id + ", containerState: "
+                  + containerState, 0);
+      return containerStatus;
+    }
+
+    private Container getMockContainer(ContainerStatus containerStatus) {
+      Container container = mock(Container.class);
+      when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
+      return container;
+    }
+  }
+
   private void verifyNodeStartFailure(String errMessage) {
     YarnConfiguration conf = createNMConfig();
     nm.init(conf);