You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by bi...@apache.org on 2013/04/08 21:17:16 UTC

svn commit: r1465731 - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/...

Author: bikas
Date: Mon Apr  8 19:17:16 2013
New Revision: 1465731

URL: http://svn.apache.org/r1465731
Log:
YARN-479. NM retry behavior for connection to RM should be similar for lost heartbeats (Jian He via bikas)

Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1465731&r1=1465730&r2=1465731&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Apr  8 19:17:16 2013
@@ -126,6 +126,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-193. Scheduler.normalizeRequest does not account for allocation
     requests that exceed maximumAllocation limits (Zhijie Shen via bikas)
 
+    YARN-479. NM retry behavior for connection to RM should be similar for
+    lost heartbeats (Jian He via bikas)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1465731&r1=1465730&r2=1465731&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Apr  8 19:17:16 2013
@@ -87,10 +87,9 @@ public class NodeStatusUpdaterImpl exten
 
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
-
-  private boolean previousHeartBeatSucceeded;
-  private List<ContainerStatus> previousContainersStatuses =
-      new ArrayList<ContainerStatus>();
+  private long rmConnectWaitMS;
+  private long rmConnectionRetryIntervalMS;
+  private boolean waitForEver;
 
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
@@ -99,7 +98,6 @@ public class NodeStatusUpdaterImpl exten
     this.context = context;
     this.dispatcher = dispatcher;
     this.metrics = metrics;
-    this.previousHeartBeatSucceeded = true;
   }
 
   @Override
@@ -137,8 +135,8 @@ public class NodeStatusUpdaterImpl exten
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
     
     LOG.info("Initialized nodemanager for " + nodeId + ":" +
-    		" physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
-    		" physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
+        " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb +
+        " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores);
     
     super.init(conf);
   }
@@ -192,12 +190,12 @@ public class NodeStatusUpdaterImpl exten
 
   private void registerWithRM() throws YarnRemoteException {
     Configuration conf = getConfig();
-    long rmConnectWaitMS =
+    rmConnectWaitMS =
         conf.getInt(
             YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
             YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS)
         * 1000;
-    long rmConnectionRetryIntervalMS =
+    rmConnectionRetryIntervalMS =
         conf.getLong(
             YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
             YarnConfiguration
@@ -210,7 +208,7 @@ public class NodeStatusUpdaterImpl exten
           " should not be negative.");
     }
 
-    boolean waitForEver = (rmConnectWaitMS == -1000);
+    waitForEver = (rmConnectWaitMS == -1000);
 
     if(! waitForEver) {
       if(rmConnectWaitMS < 0) {
@@ -319,14 +317,8 @@ public class NodeStatusUpdaterImpl exten
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
     nodeStatus.setNodeId(this.nodeId);
 
-    List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
-    if(previousHeartBeatSucceeded) {
-      previousContainersStatuses.clear();
-    } else {
-      containersStatuses.addAll(previousContainersStatuses);
-    }
-
     int numActiveContainers = 0;
+    List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
     for (Iterator<Entry<ContainerId, Container>> i =
         this.context.getContainers().entrySet().iterator(); i.hasNext();) {
       Entry<ContainerId, Container> e = i.next();
@@ -341,7 +333,6 @@ public class NodeStatusUpdaterImpl exten
       LOG.info("Sending out status for container: " + containerStatus);
 
       if (containerStatus.getState() == ContainerState.COMPLETE) {
-        previousContainersStatuses.add(containerStatus);
         // Remove
         i.remove();
 
@@ -404,6 +395,9 @@ public class NodeStatusUpdaterImpl exten
         while (!isStopped) {
           // Send heartbeat
           try {
+            NodeHeartbeatResponse response = null;
+            int rmRetryCount = 0;
+            long waitStartTime = System.currentTimeMillis();
             NodeStatus nodeStatus = getNodeStatus();
             nodeStatus.setResponseId(lastHeartBeatID);
             
@@ -414,9 +408,31 @@ public class NodeStatusUpdaterImpl exten
               request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
                 .getContainerTokenSecretManager().getCurrentKey());
             }
-            NodeHeartbeatResponse response =
-              resourceTracker.nodeHeartbeat(request);
-            previousHeartBeatSucceeded = true;
+            while (!isStopped) {
+              try {
+                rmRetryCount++;
+                response = resourceTracker.nodeHeartbeat(request);
+                break;
+              } catch (Throwable e) {
+                LOG.warn("Trying to heartbeat to ResourceManager, "
+                    + "current no. of failed attempts is " + rmRetryCount);
+                if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS
+                    || waitForEver) {
+                  try {
+                    LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000
+                        + " seconds before next heartbeat to RM");
+                    Thread.sleep(rmConnectionRetryIntervalMS);
+                  } catch(InterruptedException ex) {
+                    //done nothing
+                  }
+                } else {
+                  String errorMessage = "Failed to heartbeat to RM, " +
+                      "no. of failed attempts is "+rmRetryCount;
+                  LOG.error(errorMessage,e);
+                  throw new YarnException(errorMessage,e);
+                }
+              }
+            }
             //get next heartbeat interval from response
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             // See if the master-key has rolled over
@@ -432,7 +448,7 @@ public class NodeStatusUpdaterImpl exten
             if (response.getNodeAction() == NodeAction.SHUTDOWN) {
               LOG
                   .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
-                  		" hence shutting down.");
+                      " hence shutting down.");
               dispatcher.getEventHandler().handle(
                   new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
               break;
@@ -461,8 +477,12 @@ public class NodeStatusUpdaterImpl exten
               dispatcher.getEventHandler().handle(
                   new CMgrCompletedAppsEvent(appsToCleanup));
             }
+          } catch (YarnException e) {
+            //catch and throw the exception if tried MAX wait time to connect RM
+            dispatcher.getEventHandler().handle(
+                new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+            throw e;
           } catch (Throwable e) {
-            previousHeartBeatSucceeded = false;
             // TODO Better error handling. Thread can die with the rest of the
             // NM still running.
             LOG.error("Caught exception in status-updater", e);

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1465731&r1=1465730&r2=1465731&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Apr  8 19:17:16 2013
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -51,9 +52,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -167,6 +170,10 @@ public class TestNodeStatusUpdater {
         throws YarnRemoteException {
       NodeStatus nodeStatus = request.getNodeStatus();
       LOG.info("Got heartbeat number " + heartBeatID);
+      NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
+      Dispatcher mockDispatcher = mock(Dispatcher.class);
+      EventHandler mockEventHandler = mock(EventHandler.class);
+      when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
       nodeStatus.setResponseId(heartBeatID++);
       Map<ApplicationId, List<ContainerStatus>> appToContainers =
           getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
@@ -183,7 +190,8 @@ public class TestNodeStatusUpdater {
         launchContext.setContainerId(firstContainerID);
         launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
         launchContext.getResource().setMemory(2);
-        Container container = new ContainerImpl(conf , null, launchContext, null, null);
+        Container container = new ContainerImpl(conf , mockDispatcher,
+            launchContext, null, mockMetrics);
         this.context.getContainers().put(firstContainerID, container);
       } else if (heartBeatID == 2) {
         // Checks on the RM end
@@ -207,7 +215,8 @@ public class TestNodeStatusUpdater {
         launchContext.setContainerId(secondContainerID);
         launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
         launchContext.getResource().setMemory(3);
-        Container container = new ContainerImpl(conf, null, launchContext, null, null);
+        Container container = new ContainerImpl(conf, mockDispatcher,
+            launchContext, null, mockMetrics);
         this.context.getContainers().put(secondContainerID, container);
       } else if (heartBeatID == 3) {
         // Checks on the RM end
@@ -229,13 +238,14 @@ public class TestNodeStatusUpdater {
   }
 
   private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
-    public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
+    public ResourceTracker resourceTracker;
     private Context context;
 
     public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
       super(context, dispatcher, healthChecker, metrics);
       this.context = context;
+      resourceTracker = new MyResourceTracker(this.context);
     }
 
     @Override
@@ -312,6 +322,21 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
+    private ResourceTracker resourceTracker;
+
+    public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
+        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+      super(context, dispatcher, healthChecker, metrics);
+      resourceTracker = new MyResourceTracker5();
+    }
+
+    @Override
+    protected ResourceTracker getRMClient() {
+      return resourceTracker;
+    }
+  }
+
   private class MyNodeManager extends NodeManager {
     
     private MyNodeStatusUpdater3 nodeStatusUpdater;
@@ -328,6 +353,32 @@ public class TestNodeStatusUpdater {
     }
   }
   
+  private class MyNodeManager2 extends NodeManager {
+    public boolean isStopped = false;
+    private NodeStatusUpdater nodeStatusUpdater;
+    private CyclicBarrier syncBarrier;
+    public MyNodeManager2 (CyclicBarrier syncBarrier) {
+      this.syncBarrier = syncBarrier;
+    }
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      nodeStatusUpdater =
+          new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
+                                     metrics);
+      return nodeStatusUpdater;
+    }
+
+    @Override
+    public void stop() {
+      super.stop();
+      isStopped = true;
+      try {
+        syncBarrier.await();
+      } catch (Exception e) {
+      }
+    }
+  }
   // 
   private class MyResourceTracker2 implements ResourceTracker {
     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
@@ -505,6 +556,26 @@ public class TestNodeStatusUpdater {
     }
   }
 
+  private class MyResourceTracker5 implements ResourceTracker {
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+      
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      response.setNodeAction(registerNodeAction );
+      return response;
+    }
+    
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      heartBeatID++;
+      throw RPCUtil.getRemoteException("NodeHeartbeat exception");
+    }
+  }
+
   @Before
   public void clearError() {
     nmStartError = null;
@@ -883,6 +954,30 @@ public class TestNodeStatusUpdater {
     nm.stop();
   }
 
+  @Test(timeout = 20000)
+  public void testNodeStatusUpdaterRetryAndNMShutdown() 
+      throws InterruptedException {
+    final long connectionWaitSecs = 1;
+    final long connectionRetryIntervalSecs = 1;
+    YarnConfiguration conf = createNMConfig();
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS,
+        connectionWaitSecs);
+    conf.setLong(YarnConfiguration
+        .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS,
+        connectionRetryIntervalSecs);
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+    nm = new MyNodeManager2(syncBarrier);
+    nm.init(conf);
+    nm.start();
+    try {
+      syncBarrier.await();
+    } catch (Exception e) {
+    }
+    Assert.assertTrue(((MyNodeManager2) nm).isStopped);
+    Assert.assertTrue("calculate heartBeatCount based on" +
+        " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
+  }
+
   private class MyNMContext extends NMContext {
     ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();