You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sz...@apache.org on 2013/11/09 02:34:24 UTC

svn commit: r1540239 - in /hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/ha...

Author: szetszwo
Date: Sat Nov  9 01:34:22 2013
New Revision: 1540239

URL: http://svn.apache.org/r1540239
Log:
merge of r1535792 through r1540238 from trunk.

Modified:
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Sat Nov  9 01:34:22 2013
@@ -88,6 +88,9 @@ Release 2.3.0 - UNRELEASED
     YARN-1323. Set HTTPS webapp address along with other RPC addresses in HAUtil
     (Karthik Kambatla via Sandy Ryza)
 
+    YARN-1121. Changed ResourceManager's state-store to drain all events on
+    shut-down. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java Sat Nov  9 01:34:22 2013
@@ -49,6 +49,19 @@ public class AsyncDispatcher extends Abs
   private final BlockingQueue<Event> eventQueue;
   private volatile boolean stopped = false;
 
+  // Configuration flag for enabling/disabling draining dispatcher's events on
+  // stop functionality.
+  private volatile boolean drainEventsOnStop = false;
+
+  // Indicates all the remaining dispatcher's events on stop have been drained
+  // and processed.
+  private volatile boolean drained = true;
+
+  // For drainEventsOnStop enabled only, block newly coming events into the
+  // queue while stopping.
+  private volatile boolean blockNewEvents = false;
+  private EventHandler handlerInstance = null;
+
   private Thread eventHandlingThread;
   protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
   private boolean exitOnDispatchException;
@@ -68,6 +81,7 @@ public class AsyncDispatcher extends Abs
       @Override
       public void run() {
         while (!stopped && !Thread.currentThread().isInterrupted()) {
+          drained = eventQueue.isEmpty();
           Event event;
           try {
             event = eventQueue.take();
@@ -102,8 +116,19 @@ public class AsyncDispatcher extends Abs
     eventHandlingThread.start();
   }
 
+  public void setDrainEventsOnStop() {
+    drainEventsOnStop = true;
+  }
+
   @Override
   protected void serviceStop() throws Exception {
+    if (drainEventsOnStop) {
+      blockNewEvents = true;
+      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
+      while(!drained) {
+        Thread.yield();
+      }
+    }
     stopped = true;
     if (eventHandlingThread != null) {
       eventHandlingThread.interrupt();
@@ -173,11 +198,19 @@ public class AsyncDispatcher extends Abs
 
   @Override
   public EventHandler getEventHandler() {
-    return new GenericEventHandler();
+    if (handlerInstance == null) {
+      handlerInstance = new GenericEventHandler();
+    }
+    return handlerInstance;
   }
 
   class GenericEventHandler implements EventHandler<Event> {
     public void handle(Event event) {
+      if (blockNewEvents) {
+        return;
+      }
+      drained = false;
+
       /* all this method does is enqueue all the events onto the queue */
       int qSize = eventQueue.size();
       if (qSize !=0 && qSize %1000 == 0) {

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Sat Nov  9 01:34:22 2013
@@ -261,17 +261,20 @@ public abstract class RMStateStore exten
   }
   
   AsyncDispatcher dispatcher;
-  
-  public synchronized void serviceInit(Configuration conf) throws Exception{    
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception{
     // create async handler
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.register(RMStateStoreEventType.class, 
                         new ForwardingEventHandler());
+    dispatcher.setDrainEventsOnStop();
     initInternal(conf);
   }
-  
-  protected synchronized void serviceStart() throws Exception {
+
+  @Override
+  protected void serviceStart() throws Exception {
     dispatcher.start();
     startInternal();
   }
@@ -288,11 +291,12 @@ public abstract class RMStateStore exten
    */
   protected abstract void startInternal() throws Exception;
 
-  public synchronized void serviceStop() throws Exception {
+  @Override
+  protected void serviceStop() throws Exception {
     closeInternal();
     dispatcher.stop();
   }
-  
+
   /**
    * Derived classes close themselves using this method.
    * The base class will be closed and the event dispatcher will be shutdown 
@@ -509,8 +513,7 @@ public abstract class RMStateStore exten
   }
 
   // Dispatcher related code
-  
-  private synchronized void handleStoreEvent(RMStateStoreEvent event) {
+  protected void handleStoreEvent(RMStateStoreEvent event) {
     if (event.getType().equals(RMStateStoreEventType.STORE_APP)
         || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
       ApplicationState appState = null;

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Sat Nov  9 01:34:22 2013
@@ -163,6 +163,14 @@ public class MockRM extends ResourceMana
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
       int maxAppAttempts, Credentials ts, String appType) throws Exception {
+    return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+      maxAppAttempts, ts, appType, true);
+  }
+
+  public RMApp submitApp(int masterMemory, String name, String user,
+      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+      int maxAppAttempts, Credentials ts, String appType,
+      boolean waitForAccepted) throws Exception {
     ApplicationClientProtocol client = getClientRMService();
     GetNewApplicationResponse resp = client.getNewApplication(Records
         .newRecord(GetNewApplicationRequest.class));
@@ -222,7 +230,9 @@ public class MockRM extends ResourceMana
     }.setClientReq(client, req);
     fakeUser.doAs(action);
     // make sure app is immediately available after submit
-    waitForState(appId, RMAppState.ACCEPTED);
+    if (waitForAccepted) {
+      waitForState(appId, RMAppState.ACCEPTED);
+    }
     return getRMContext().getRMApps().get(appId);
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1540239&r1=1540238&r2=1540239&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Sat Nov  9 01:34:22 2013
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -1062,6 +1063,65 @@ public class TestRMRestart {
     rm2.stop();
   }
 
+  @Test
+  public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore() {
+      volatile boolean wait = true;
+      @Override
+      public void serviceStop() throws Exception {
+        // Unblock app saving request.
+        wait = false;
+        super.serviceStop();
+      }
+
+      @Override
+      protected void handleStoreEvent(RMStateStoreEvent event) {
+        // Block app saving request.
+        while (wait);
+        super.handleStoreEvent(event);
+      }
+    };
+    memStore.init(conf);
+
+    // start RM
+    final MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+
+    // create apps.
+    final ArrayList<RMApp> appList = new ArrayList<RMApp>();
+    final int NUM_APPS = 5;
+
+    for (int i = 0; i < NUM_APPS; i++) {
+      RMApp app = rm1.submitApp(200, "name", "user",
+            new HashMap<ApplicationAccessType, String>(), false,
+            "default", -1, null, "MAPREDUCE", false);
+      appList.add(app);
+      rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING);
+    }
+    // all apps's saving request are now enqueued to RMStateStore's dispatcher
+    // queue, and will be processed once rm.stop() is called.
+
+    // Nothing exist in state store before stop is called.
+    Map<ApplicationId, ApplicationState> rmAppState =
+        memStore.getState().getApplicationState();
+    Assert.assertTrue(rmAppState.size() == 0);
+
+    // stop rm
+    rm1.stop();
+
+    // Assert app info is still saved even if stop is called with pending saving
+    // request on dispatcher.
+    for (RMApp app : appList) {
+      ApplicationState appState = rmAppState.get(app.getApplicationId());
+      Assert.assertNotNull(appState);
+      Assert.assertEquals(0, appState.getAttemptCount());
+      Assert.assertEquals(appState.getApplicationSubmissionContext()
+        .getApplicationId(), app.getApplicationSubmissionContext()
+        .getApplicationId());
+    }
+    Assert.assertTrue(rmAppState.size() == NUM_APPS);
+  }
+
   public static class TestSecurityMockRM extends MockRM {
 
     public TestSecurityMockRM(Configuration conf, RMStateStore store) {