You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/04/11 04:01:45 UTC

svn commit: r1466753 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-commo...

Author: vinodkv
Date: Thu Apr 11 02:01:44 2013
New Revision: 1466753

URL: http://svn.apache.org/r1466753
Log:
YARN-495. Changed NM reboot behaviour to be a simple resync - kill all containers  and re-register with RM. Contributed by Jian He.
svn merge --ignore-ancestry -c 1466752 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu Apr 11 02:01:44 2013
@@ -71,6 +71,9 @@ Release 2.0.5-beta - UNRELEASED
     YARN-479. NM retry behavior for connection to RM should be similar for
     lost heartbeats (Jian He via bikas)
 
+    YARN-495. Changed NM reboot behaviour to be a simple resync - kill all
+    containers  and re-register with RM. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java Thu Apr 11 02:01:44 2013
@@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.server.ap
  */
 
 public enum NodeAction {
-  NORMAL, REBOOT, SHUTDOWN
+  NORMAL, RESYNC, SHUTDOWN
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Thu Apr 11 02:01:44 2013
@@ -25,7 +25,7 @@ import "yarn_protos.proto";
 
 enum NodeActionProto {
   NORMAL = 0;
-  REBOOT = 1;
+  RESYNC = 1;
   SHUTDOWN = 2;
 }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Thu Apr 11 02:01:44 2013
@@ -81,6 +81,7 @@ public class NodeManager extends Composi
   private Context context;
   private AsyncDispatcher dispatcher;
   private ContainerManagerImpl containerManager;
+  private NodeStatusUpdater nodeStatusUpdater;
   private static CompositeServiceShutdownHook nodeManagerShutdownHook; 
   
   private long waitForContainersOnShutdownMillis;
@@ -163,7 +164,7 @@ public class NodeManager extends Composi
     addService(nodeHealthChecker);
     dirsHandler = nodeHealthChecker.getDiskHandler();
 
-    NodeStatusUpdater nodeStatusUpdater =
+    nodeStatusUpdater =
         createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
 
     NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
@@ -214,35 +215,67 @@ public class NodeManager extends Composi
     if (isStopping.getAndSet(true)) {
       return;
     }
-    
-    cleanupContainers();
+
+    cleanupContainers(NodeManagerEventType.SHUTDOWN);
     super.stop();
     DefaultMetricsSystem.shutdown();
   }
-  
+
+  protected void cleanupContainersOnResync() {
+    //we do not want to block dispatcher thread here
+    new Thread() {
+      @Override
+      public void run() {
+        cleanupContainers(NodeManagerEventType.RESYNC);
+        ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
+      }
+    }.start();
+  }
+
   @SuppressWarnings("unchecked")
-  protected void cleanupContainers() {
+  protected void cleanupContainers(NodeManagerEventType eventType) {
     Map<ContainerId, Container> containers = context.getContainers();
     if (containers.isEmpty()) {
       return;
     }
-    LOG.info("Containers still running on shutdown: " + containers.keySet());
+    LOG.info("Containers still running on " + eventType + " : "
+        + containers.keySet());
     
-    List<ContainerId> containerIds = new ArrayList<ContainerId>(containers.keySet());
+    List<ContainerId> containerIds =
+        new ArrayList<ContainerId>(containers.keySet());
     dispatcher.getEventHandler().handle(
         new CMgrCompletedContainersEvent(containerIds, 
             CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
     
     LOG.info("Waiting for containers to be killed");
     
-    long waitStartTime = System.currentTimeMillis();
-    while (!containers.isEmpty() && 
-        System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException ex) {
-        LOG.warn("Interrupted while sleeping on container kill", ex);
+    switch (eventType) {
+    case SHUTDOWN:
+      long waitStartTime = System.currentTimeMillis();
+      while (!containers.isEmpty()
+          && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException ex) {
+          LOG.warn("Interrupted while sleeping on container kill on shutdown",
+            ex);
+        }
       }
+      break;
+    case RESYNC:
+      while (!containers.isEmpty()) {
+        try {
+          Thread.sleep(1000);
+          //to remove done containers from the map
+          nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
+        } catch (InterruptedException ex) {
+          LOG.warn("Interrupted while sleeping on container kill on resync",
+            ex);
+        }
+      }
+      break;
+    default:
+      LOG.warn("Invalid eventType: " + eventType);
     }
 
     // All containers killed
@@ -342,9 +375,8 @@ public class NodeManager extends Composi
     case SHUTDOWN:
       stop();
       break;
-    case REBOOT:
-      stop();
-      reboot();
+    case RESYNC:
+      cleanupContainersOnResync();
       break;
     default:
       LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
@@ -361,6 +393,11 @@ public class NodeManager extends Composi
     return containerManager;
   }
   
+  //For testing
+  Dispatcher getNMDispatcher(){
+    return dispatcher;
+  }
+
   @VisibleForTesting
   Context getNMContext() {
     return this.context;

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java Thu Apr 11 02:01:44 2013
@@ -18,5 +18,5 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 public enum NodeManagerEventType {
-  SHUTDOWN, REBOOT
+  SHUTDOWN, RESYNC
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Thu Apr 11 02:01:44 2013
@@ -18,9 +18,11 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.service.Service;
 
 public interface NodeStatusUpdater extends Service {
 
   void sendOutofBandHeartBeat();
+  NodeStatus getNodeStatusAndUpdateContainersInContext();
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Thu Apr 11 02:01:44 2013
@@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class NodeStatusUpdaterImpl extends AbstractService implements
     NodeStatusUpdater {
 
@@ -91,6 +93,9 @@ public class NodeStatusUpdaterImpl exten
   private long rmConnectionRetryIntervalMS;
   private boolean waitForEver;
 
+  private Runnable statusUpdaterRunnable;
+  private Thread  statusUpdater;
+
   public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
       NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
     super(NodeStatusUpdaterImpl.class.getName());
@@ -169,6 +174,22 @@ public class NodeStatusUpdaterImpl exten
     this.isStopped = true;
     super.stop();
   }
+
+  protected void rebootNodeStatusUpdater() {
+    // Interrupt the updater.
+    this.isStopped = true;
+
+    try {
+      statusUpdater.join();
+      registerWithRM();
+      statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
+      this.isStopped = false;
+      statusUpdater.start();
+      LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
+    } catch (Exception e) {
+      throw new AvroRuntimeException(e);
+    }
+  }
   
   private boolean isSecurityEnabled() {
     return UserGroupInformation.isSecurityEnabled();
@@ -188,7 +209,8 @@ public class NodeStatusUpdaterImpl exten
         conf);
   }
 
-  private void registerWithRM() throws YarnRemoteException {
+  @VisibleForTesting
+  protected void registerWithRM() throws YarnRemoteException {
     Configuration conf = getConfig();
     rmConnectWaitMS =
         conf.getInt(
@@ -312,7 +334,7 @@ public class NodeStatusUpdaterImpl exten
     return appList;
   }
 
-  private NodeStatus getNodeStatus() {
+  public NodeStatus getNodeStatusAndUpdateContainersInContext() {
 
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
     nodeStatus.setNodeId(this.nodeId);
@@ -387,7 +409,7 @@ public class NodeStatusUpdaterImpl exten
 
   protected void startStatusUpdater() {
 
-    new Thread("Node Status Updater") {
+    statusUpdaterRunnable = new Runnable() {
       @Override
       @SuppressWarnings("unchecked")
       public void run() {
@@ -398,7 +420,7 @@ public class NodeStatusUpdaterImpl exten
             NodeHeartbeatResponse response = null;
             int rmRetryCount = 0;
             long waitStartTime = System.currentTimeMillis();
-            NodeStatus nodeStatus = getNodeStatus();
+            NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
             nodeStatus.setResponseId(lastHeartBeatID);
             
             NodeHeartbeatRequest request = recordFactory
@@ -453,11 +475,11 @@ public class NodeStatusUpdaterImpl exten
                   new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
               break;
             }
-            if (response.getNodeAction() == NodeAction.REBOOT) {
+            if (response.getNodeAction() == NodeAction.RESYNC) {
               LOG.info("Node is out of sync with ResourceManager,"
                   + " hence rebooting.");
               dispatcher.getEventHandler().handle(
-                  new NodeManagerEvent(NodeManagerEventType.REBOOT));
+                  new NodeManagerEvent(NodeManagerEventType.RESYNC));
               break;
             }
 
@@ -500,6 +522,9 @@ public class NodeStatusUpdaterImpl exten
           }
         }
       }
-    }.start();
+    };
+    statusUpdater =
+        new Thread(statusUpdaterRunnable, "Node Status Updater");
+    statusUpdater.start();
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java Thu Apr 11 02:01:44 2013
@@ -160,7 +160,10 @@ public class TestNodeManagerReboot {
         "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
         ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
 
-    nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT));
+    // restart the NodeManager
+    nm.stop();
+    nm = new MyNodeManager();
+    nm.start();    
 
     numTries = 0;
     while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
@@ -250,26 +253,6 @@ public class TestNodeManagerReboot {
       return delService;
     }
 
-    // mimic part of reboot process
-    @Override
-    public void handle(NodeManagerEvent event) {
-      switch (event.getType()) {
-        case SHUTDOWN:
-          this.stop();
-          break;
-        case REBOOT:
-          this.stop();
-          this.createNewMyNodeManager().start();
-          break;
-        default:
-          LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
-      }
-    }
-
-    private MyNodeManager createNewMyNodeManager() {
-      return new MyNodeManager();
-    }
-
     private YarnConfiguration createNMConfig() {
       YarnConfiguration conf = new YarnConfiguration();
       conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java Thu Apr 11 02:01:44 2013
@@ -28,6 +28,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CyclicBarrier;
 
 import junit.framework.Assert;
 
@@ -49,9 +52,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -71,6 +77,7 @@ public class TestNodeManagerShutdown {
       .getRecordFactory(null);
   static final String user = "nobody";
   private FileContext localFS;
+  private CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
   @Before
   public void setup() throws UnsupportedFileSystemException {
@@ -91,11 +98,64 @@ public class TestNodeManagerShutdown {
     NodeManager nm = getNodeManager();
     nm.init(createNMConfig());
     nm.start();
+    startContainers(nm);
     
+    final int MAX_TRIES=20;
+    int numTries = 0;
+    while (!processStartFile.exists() && numTries < MAX_TRIES) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException ex) {ex.printStackTrace();}
+      numTries++;
+    }
+    
+    nm.stop();
+    
+    // Now verify the contents of the file
+    // Script generates a message when it receives a sigterm
+    // so we look for that
+    BufferedReader reader =
+        new BufferedReader(new FileReader(processStartFile));
+
+    boolean foundSigTermMessage = false;
+    while (true) {
+      String line = reader.readLine();
+      if (line == null) {
+        break;
+      }
+      if (line.contains("SIGTERM")) {
+        foundSigTermMessage = true;
+        break;
+      }
+    }
+    Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
+    reader.close();
+  }
+  
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testKillContainersOnResync() throws IOException, InterruptedException {
+    NodeManager nm = new TestNodeManager();
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    nm.start();
+    startContainers(nm);
+
+    assert ((TestNodeManager) nm).getNMRegistrationCount() == 1;
+    nm.getNMDispatcher().getEventHandler().
+        handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
+    try {
+      syncBarrier.await();
+    } catch (BrokenBarrierException e) {
+    }
+    assert ((TestNodeManager) nm).getNMRegistrationCount() == 2;
+  }
+
+  private void startContainers(NodeManager nm) throws IOException {
     ContainerManagerImpl containerManager = nm.getContainerManager();
     File scriptFile = createUnhaltingScriptFile();
     
-    ContainerLaunchContext containerLaunchContext = 
+    ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
 
     // Construct the Container-id
@@ -127,7 +187,8 @@ public class TestNodeManagerShutdown {
     containerLaunchContext.setResource(recordFactory
         .newRecordInstance(Resource.class));
     containerLaunchContext.getResource().setMemory(1024);
-    StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
+    StartContainerRequest startRequest =
+        recordFactory.newRecordInstance(StartContainerRequest.class);
     startRequest.setContainerLaunchContext(containerLaunchContext);
     containerManager.startContainer(startRequest);
     
@@ -137,37 +198,6 @@ public class TestNodeManagerShutdown {
     ContainerStatus containerStatus =
         containerManager.getContainerStatus(request).getStatus();
     Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
-    
-    final int MAX_TRIES=20;
-    int numTries = 0;
-    while (!processStartFile.exists() && numTries < MAX_TRIES) {
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException ex) {ex.printStackTrace();}
-      numTries++;
-    }
-    
-    nm.stop();
-    
-    // Now verify the contents of the file
-    // Script generates a message when it receives a sigterm
-    // so we look for that
-    BufferedReader reader =
-        new BufferedReader(new FileReader(processStartFile));
-
-    boolean foundSigTermMessage = false;
-    while (true) {
-      String line = reader.readLine();
-      if (line == null) {
-        break;
-      }
-      if (line.contains("SIGTERM")) {
-        foundSigTermMessage = true;
-        break;
-      }
-    }
-    Assert.assertTrue("Did not find sigterm message", foundSigTermMessage);
-    reader.close();
   }
   
   private ContainerId createContainerId() {
@@ -226,4 +256,48 @@ public class TestNodeManagerShutdown {
       }
     };
   }
+
+  class TestNodeManager extends NodeManager {
+
+    private int registrationCount = 0;
+
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      return new TestNodeStatusUpdaterImpl(context, dispatcher,
+          healthChecker, metrics);
+    }
+
+    public int getNMRegistrationCount() {
+      return registrationCount;
+    }
+
+    class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater {
+
+      public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
+          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+        super(context, dispatcher, healthChecker, metrics);
+      }
+
+      @Override
+      protected void registerWithRM() throws YarnRemoteException {
+        super.registerWithRM();
+        registrationCount++;
+      }
+
+      @Override
+      protected void rebootNodeStatusUpdater() {
+        ConcurrentMap<ContainerId, Container> containers =
+            getNMContext().getContainers();
+        // ensure that containers are empty before restart nodeStatusUpdater
+        Assert.assertTrue(containers.isEmpty());
+        super.rebootNodeStatusUpdater();
+        try {
+          syncBarrier.await();
+        } catch (InterruptedException e) {
+        } catch (BrokenBarrierException e) {
+        }
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Thu Apr 11 02:01:44 2013
@@ -99,7 +99,6 @@ public class TestNodeStatusUpdater {
   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
   private final Configuration conf = createNMConfig();
   private NodeManager nm;
-  protected NodeManager rebootedNodeManager;
   private boolean containerStatusBackupSuccessfully = true;
   private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
 
@@ -663,8 +662,8 @@ public class TestNodeStatusUpdater {
       }
       
       @Override
-      protected void cleanupContainers() {
-        super.cleanupContainers();
+      protected void cleanupContainers(NodeManagerEventType eventType) {
+        super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
         numCleanups.incrementAndGet();
       }
     };
@@ -718,50 +717,6 @@ public class TestNodeStatusUpdater {
   }
 
   @Test
-  public void testNodeReboot() throws Exception {
-    nm = getNodeManager(NodeAction.REBOOT);
-    YarnConfiguration conf = createNMConfig();
-    nm.init(conf);
-    Assert.assertEquals(STATE.INITED, nm.getServiceState());
-    nm.start();
-
-    int waitCount = 0;
-    while (heartBeatID < 1 && waitCount++ != 20) {
-      Thread.sleep(500);
-    }
-    Assert.assertFalse(heartBeatID < 1);
-
-    // NM takes a while to reach the STOPPED state.
-    waitCount = 0;
-    while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
-      LOG.info("Waiting for NM to stop..");
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
-    
-    waitCount = 0;
-    while (null == rebootedNodeManager && waitCount++ != 20) {
-      LOG.info("Waiting for NM to reinitialize..");
-      Thread.sleep(1000);
-    }
-      
-    waitCount = 0;
-    while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
-      LOG.info("Waiting for NM to start..");
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
-
-    rebootedNodeManager.stop();
-    waitCount = 0;
-    while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
-      LOG.info("Waiting for NM to stop..");
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
-  }
-  
-  @Test
   public void testNMShutdownForRegistrationFailure() {
 
     nm = new NodeManager() {
@@ -1108,12 +1063,6 @@ public class TestNodeStatusUpdater {
         myNodeStatusUpdater.resourceTracker = myResourceTracker2;
         return myNodeStatusUpdater;
       }
-
-      @Override
-      NodeManager createNewNodeManager() {
-        rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
-        return rebootedNodeManager;
-      }
     };
   }
 }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Thu Apr 11 02:01:44 2013
@@ -73,13 +73,13 @@ public class ResourceTrackerService exte
   private Server server;
   private InetSocketAddress resourceTrackerAddress;
 
-  private static final NodeHeartbeatResponse reboot = recordFactory
+  private static final NodeHeartbeatResponse resync = recordFactory
       .newRecordInstance(NodeHeartbeatResponse.class);
   private static final NodeHeartbeatResponse shutDown = recordFactory
   .newRecordInstance(NodeHeartbeatResponse.class);
   
   static {
-    reboot.setNodeAction(NodeAction.REBOOT);
+    resync.setNodeAction(NodeAction.RESYNC);
 
     shutDown.setNodeAction(NodeAction.SHUTDOWN);
   }
@@ -220,7 +220,7 @@ public class ResourceTrackerService exte
     if (rmNode == null) {
       /* node does not exist */
       LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
-      return reboot;
+      return resync;
     }
 
     // Send ping
@@ -250,7 +250,7 @@ public class ResourceTrackerService exte
       // TODO: Just sending reboot is not enough. Think more.
       this.rmContext.getDispatcher().getEventHandler().handle(
           new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
-      return reboot;
+      return resync;
     }
 
     // Heartbeat response

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Apr 11 02:01:44 2013
@@ -225,9 +225,9 @@ public class TestRMRestart {
     
     // NM should be rebooted on heartbeat, even first heartbeat for nm2
     NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
-    Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
+    Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
     hbResponse = nm2.nodeHeartbeat(true);
-    Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
+    Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
     
     // new NM to represent NM re-register
     nm1 = rm2.registerNode("h1:1234", 15120);
@@ -235,9 +235,9 @@ public class TestRMRestart {
 
     // verify no more reboot response sent
     hbResponse = nm1.nodeHeartbeat(true);
-    Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
+    Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction());
     hbResponse = nm2.nodeHeartbeat(true);
-    Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
+    Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction());
     
     // assert app1 attempt is saved
     attempt1 = loadedApp1.getCurrentAppAttempt();

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java Thu Apr 11 02:01:44 2013
@@ -282,7 +282,7 @@ public class TestResourceTrackerService 
 
     nodeHeartbeat = nm2.nodeHeartbeat(
       new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
-    Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
+    Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
     checkRebootedNMCount(rm, ++initialMetricCount);
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java?rev=1466753&r1=1466752&r2=1466753&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java Thu Apr 11 02:01:44 2013
@@ -130,6 +130,6 @@ public class TestRMNMRPCResponseId {
 
     nodeStatus.setResponseId(0);
     response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
-    Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
+    Assert.assertTrue(NodeAction.RESYNC.equals(response.getNodeAction()));
   }
 }
\ No newline at end of file