You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/06 09:04:42 UTC

[31/43] git commit: YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and FatalEventDispatcher try to transition RM to StandBy at the same time. Contributed by Rohith Sharmaks

YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and FatalEventDispatcher try to transition RM to StandBy at the same time. Contributed by Rohith Sharmaks


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/395275af
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/395275af
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/395275af

Branch: refs/heads/HDFS-EC
Commit: 395275af8622c780b9071c243422b0780e096202
Parents: 8549fa5
Author: Jian He <ji...@apache.org>
Authored: Wed Nov 5 16:59:54 2014 -0800
Committer: Jian He <ji...@apache.org>
Committed: Wed Nov 5 16:59:54 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  4 ++
 .../hadoop/yarn/client/TestRMFailover.java      |  8 +--
 .../resourcemanager/RMFatalEventType.java       |  1 -
 .../server/resourcemanager/ResourceManager.java | 49 +++++++---------
 .../resourcemanager/recovery/RMStateStore.java  | 24 ++++++--
 .../yarn/server/resourcemanager/TestRMHA.java   | 62 ++++++++++++++++++++
 6 files changed, 109 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/395275af/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8adde9b..887d1d4 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -868,6 +868,10 @@ Release 2.6.0 - UNRELEASED
     YARN-2805. Fixed ResourceManager to load HA configs correctly before kerberos
     login. (Wangda Tan via vinodkv)
 
+    YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and
+    FatalEventDispatcher try to transition RM to StandBy at the same time.
+    (Rohith Sharmaks via jianhe)
+
 Release 2.5.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/395275af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
index 0440f1d..0634cc3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java
@@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
-import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
 import org.junit.After;
@@ -173,7 +171,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
     verifyConnections();
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testAutomaticFailover()
       throws YarnException, InterruptedException, IOException {
@@ -196,10 +193,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
     // so it transitions to standby.
     ResourceManager rm = cluster.getResourceManager(
         cluster.getActiveRMIndex());
-    RMFatalEvent event =
-        new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
-            "Fake RMFatalEvent");
-    rm.getRMContext().getDispatcher().getEventHandler().handle(event);
+    rm.handleTransitionToStandBy();
     int maxWaitingAttempts = 2000;
     while (maxWaitingAttempts-- > 0 ) {
       if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/395275af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java
index 0629c70..789c018 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public enum RMFatalEventType {
   // Source <- Store
-  STATE_STORE_FENCED,
   STATE_STORE_OP_FAILED,
 
   // Source <- Embedded Elector

http://git-wip-us.apache.org/repos/asf/hadoop/blob/395275af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 6adc73a..4051054 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -269,6 +269,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   @VisibleForTesting
   protected void setRMStateStore(RMStateStore rmStore) {
     rmStore.setRMDispatcher(rmDispatcher);
+    rmStore.setResourceManager(this);
     rmContext.setStateStore(rmStore);
   }
 
@@ -397,11 +398,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
     private EventHandler<SchedulerEvent> schedulerDispatcher;
     private ApplicationMasterLauncher applicationMasterLauncher;
     private ContainerAllocationExpirer containerAllocationExpirer;
-
+    private ResourceManager rm;
     private boolean recoveryEnabled;
 
-    RMActiveServices() {
+    RMActiveServices(ResourceManager rm) {
       super("RMActiveServices");
+      this.rm = rm;
     }
 
     @Override
@@ -449,6 +451,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
       try {
         rmStore.init(conf);
         rmStore.setRMDispatcher(rmDispatcher);
+        rmStore.setResourceManager(rm);
       } catch (Exception e) {
         // the Exception from stateStore.init() needs to be handled for
         // HA and we need to give up master status if we got fenced
@@ -729,39 +732,31 @@ public class ResourceManager extends CompositeService implements Recoverable {
   @Private
   public static class RMFatalEventDispatcher
       implements EventHandler<RMFatalEvent> {
-    private final RMContext rmContext;
-    private final ResourceManager rm;
-
-    public RMFatalEventDispatcher(
-        RMContext rmContext, ResourceManager resourceManager) {
-      this.rmContext = rmContext;
-      this.rm = resourceManager;
-    }
 
     @Override
     public void handle(RMFatalEvent event) {
       LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
           event.getType().name() + ". Cause:\n" + event.getCause());
 
-      if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) {
-        LOG.info("RMStateStore has been fenced");
-        if (rmContext.isHAEnabled()) {
-          try {
-            // Transition to standby and reinit active services
-            LOG.info("Transitioning RM to Standby mode");
-            rm.transitionToStandby(true);
-            rm.adminService.resetLeaderElection();
-            return;
-          } catch (Exception e) {
-            LOG.fatal("Failed to transition RM to Standby mode.");
-          }
-        }
-      }
-
       ExitUtil.terminate(1, event.getCause());
     }
   }
 
+  public void handleTransitionToStandBy() {
+    if (rmContext.isHAEnabled()) {
+      try {
+        // Transition to standby and reinit active services
+        LOG.info("Transitioning RM to Standby mode");
+        transitionToStandby(true);
+        adminService.resetLeaderElection();
+        return;
+      } catch (Exception e) {
+        LOG.fatal("Failed to transition RM to Standby mode.");
+        ExitUtil.terminate(1, e);
+      }
+    }
+  }
+
   @Private
   public static final class ApplicationEventDispatcher implements
       EventHandler<RMAppEvent> {
@@ -990,7 +985,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
    * @throws Exception
    */
   protected void createAndInitActiveServices() throws Exception {
-    activeServices = new RMActiveServices();
+    activeServices = new RMActiveServices(this);
     activeServices.init(conf);
   }
 
@@ -1227,7 +1222,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   private Dispatcher setupDispatcher() {
     Dispatcher dispatcher = createDispatcher();
     dispatcher.register(RMFatalEventType.class,
-        new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
+        new ResourceManager.RMFatalEventDispatcher());
     return dispatcher;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/395275af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 973fe54..beac403 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -87,6 +88,7 @@ public abstract class RMStateStore extends AbstractService {
       "AMRMTokenSecretManagerRoot";
   protected static final String VERSION_NODE = "RMVersionNode";
   protected static final String EPOCH_NODE = "EpochNode";
+  private ResourceManager resourceManager;
 
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
@@ -818,13 +820,15 @@ public abstract class RMStateStore extends AbstractService {
    * @param failureCause the exception due to which the operation failed
    */
   protected void notifyStoreOperationFailed(Exception failureCause) {
-    RMFatalEventType type;
     if (failureCause instanceof StoreFencedException) {
-      type = RMFatalEventType.STATE_STORE_FENCED;
+      Thread standByTransitionThread =
+          new Thread(new StandByTransitionThread());
+      standByTransitionThread.setName("StandByTransitionThread Handler");
+      standByTransitionThread.start();
     } else {
-      type = RMFatalEventType.STATE_STORE_OP_FAILED;
+      rmDispatcher.getEventHandler().handle(
+        new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));
     }
-    rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
   }
  
   @SuppressWarnings("unchecked")
@@ -866,4 +870,16 @@ public abstract class RMStateStore extends AbstractService {
    * @throws Exception
    */
   public abstract void deleteStore() throws Exception;
+
+  public void setResourceManager(ResourceManager rm) {
+    this.resourceManager = rm;
+  }
+
+  private class StandByTransitionThread implements Runnable {
+    @Override
+    public void run() {
+      LOG.info("RMStateStore has been fenced");
+      resourceManager.handleTransitionToStandBy();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/395275af/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 8cef4c9..c6d7d09 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -451,6 +452,67 @@ public class TestRMHA {
     checkActiveRMFunctionality();
   }
 
+  @Test(timeout = 90000)
+  public void testTransitionedToStandbyShouldNotHang() throws Exception {
+    configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    Configuration conf = new YarnConfiguration(configuration);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore() {
+      @Override
+      public synchronized void updateApplicationState(ApplicationState appState) {
+        notifyStoreOperationFailed(new StoreFencedException());
+      }
+    };
+    memStore.init(conf);
+    rm = new MockRM(conf, memStore) {
+      @Override
+      void stopActiveServices() throws Exception {
+        Thread.sleep(10000);
+        super.stopActiveServices();
+      }
+    };
+    rm.init(conf);
+    final StateChangeRequestInfo requestInfo =
+        new StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+    assertEquals(STATE_ERR, HAServiceState.INITIALIZING, rm.adminService
+        .getServiceStatus().getState());
+    assertFalse("RM is ready to become active before being started",
+        rm.adminService.getServiceStatus().isReadyToBecomeActive());
+    checkMonitorHealth();
+
+    rm.start();
+    checkMonitorHealth();
+    checkStandbyRMFunctionality();
+
+    // 2. Transition to Active.
+    rm.adminService.transitionToActive(requestInfo);
+
+    // 3. Try Transition to standby
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          rm.transitionToStandby(true);
+        } catch (IOException e) {
+          e.printStackTrace();
+        } catch (Exception e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    });
+    t.start();
+
+    rm.getRMContext().getStateStore().updateApplicationState(null);
+    t.join(); // wait for thread to finish
+
+    rm.adminService.transitionToStandby(requestInfo);
+    checkStandbyRMFunctionality();
+    rm.stop();
+  }
+
   public void innerTestHAWithRMHostName(boolean includeBindHost) {
     //this is run two times, with and without a bind host configured
     if (includeBindHost) {