You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ka...@apache.org on 2014/02/12 15:11:38 UTC

svn commit: r1567629 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ hadoop-yarn/hadoop-yarn-server/ha...

Author: kasha
Date: Wed Feb 12 14:11:37 2014
New Revision: 1567629

URL: http://svn.apache.org/r1567629
Log:
YARN-1641. ZK store should attempt a write periodically to ensure it is still Active. (kasha)

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1567629&r1=1567628&r2=1567629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Wed Feb 12 14:11:37 2014
@@ -161,6 +161,9 @@ Release 2.4.0 - UNRELEASED
     YARN-1706. Created an utility method to dump timeline records to JSON
     strings. (zjshen)
 
+    YARN-1641. ZK store should attempt a write periodically to ensure it is 
+    still Active. (kasha)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1567629&r1=1567628&r2=1567629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Wed Feb 12 14:11:37 2014
@@ -676,11 +676,11 @@ public abstract class RMStateStore exten
 
   @SuppressWarnings("unchecked")
   /**
-   * In {#handleStoreEvent}, this method is called to notify the
-   * ResourceManager that the store operation has failed.
+   * This method is called to notify the ResourceManager that the store
+   * operation has failed.
    * @param failureCause the exception due to which the operation failed
    */
-  private void notifyStoreOperationFailed(Exception failureCause) {
+  protected void notifyStoreOperationFailed(Exception failureCause) {
     RMFatalEventType type;
     if (failureCause instanceof StoreFencedException) {
       type = RMFatalEventType.STATE_STORE_FENCED;

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1567629&r1=1567628&r2=1567629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Wed Feb 12 14:11:37 2014
@@ -137,6 +137,7 @@ public class ZKRMStateStore extends RMSt
   private String fencingNodePath;
   private Op createFencingNodePathOp;
   private Op deleteFencingNodePathOp;
+  private Thread verifyActiveStatusThread;
   private String zkRootNodeUsername;
   private final String zkRootNodePassword = Long.toString(random.nextLong());
 
@@ -258,6 +259,8 @@ public class ZKRMStateStore extends RMSt
     createRootDir(zkRootNodePath);
     if (HAUtil.isHAEnabled(getConfig())){
       fence();
+      verifyActiveStatusThread = new VerifyActiveStatusThread();
+      verifyActiveStatusThread.start();
     }
     createRootDir(rmAppRoot);
     createRootDir(rmDTSecretManagerRoot);
@@ -350,6 +353,10 @@ public class ZKRMStateStore extends RMSt
 
   @Override
   protected synchronized void closeInternal() throws Exception {
+    if (verifyActiveStatusThread != null) {
+      verifyActiveStatusThread.interrupt();
+      verifyActiveStatusThread.join(1000);
+    }
     closeZkClients();
   }
 
@@ -856,6 +863,32 @@ public class ZKRMStateStore extends RMSt
     }.runWithRetries();
   }
 
+  /**
+   * Helper class that periodically attempts creating a znode to ensure that
+   * this RM continues to be the Active.
+   */
+  private class VerifyActiveStatusThread extends Thread {
+    private List<Op> emptyOpList = new ArrayList<Op>();
+
+    VerifyActiveStatusThread() {
+      super(VerifyActiveStatusThread.class.getName());
+    }
+
+    public void run() {
+      try {
+        while (true) {
+          doMultiWithRetries(emptyOpList);
+          Thread.sleep(zkSessionTimeout);
+        }
+      } catch (InterruptedException ie) {
+        LOG.info(VerifyActiveStatusThread.class.getName() + " thread " +
+            "interrupted! Exiting!");
+      } catch (Exception e) {
+        notifyStoreOperationFailed(new StoreFencedException());
+      }
+    }
+  }
+
   private abstract class ZKAction<T> {
     // run() expects synchronization on ZKRMStateStore.this
     abstract T run() throws KeeperException, InterruptedException;

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java?rev=1567629&r1=1567628&r2=1567629&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java Wed Feb 12 14:11:37 2014
@@ -23,10 +23,7 @@ import static org.junit.Assert.assertTru
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,15 +31,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
@@ -54,6 +44,7 @@ import org.junit.Test;
 public class TestZKRMStateStore extends RMStateStoreTestBase {
 
   public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
+  private static final int ZK_TIMEOUT_MS = 1000;
 
   class TestZKRMStateStoreTester implements RMStateStoreHelper {
 
@@ -141,6 +132,7 @@ public class TestZKRMStateStore extends 
     conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
     conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
     conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+    conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
     conf.set(YarnConfiguration.RM_HA_ID, rmId);
     for (String rpcAddress : YarnConfiguration.RM_SERVICES_ADDRESS_CONF_KEYS) {
       for (String id : HAUtil.getRMHAIds(conf)) {
@@ -182,26 +174,7 @@ public class TestZKRMStateStore extends 
         HAServiceProtocol.HAServiceState.ACTIVE,
         rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
 
-    // Submitting an application to RM1 to trigger a state store operation.
-    // RM1 should realize that it got fenced and is not the Active RM anymore.
-    Map mockMap = mock(Map.class);
-    ApplicationSubmissionContext asc =
-        ApplicationSubmissionContext.newInstance(
-            ApplicationId.newInstance(1000, 1),
-            "testApplication", // app Name
-            "default", // queue name
-            Priority.newInstance(0),
-            ContainerLaunchContext.newInstance(mockMap, mockMap,
-                new ArrayList<String>(), mockMap, mock(ByteBuffer.class),
-                mockMap),
-            false, // unmanaged AM
-            true, // cancelTokens
-            1, // max app attempts
-            Resource.newInstance(1024, 1));
-    ClientRMService rmService = rm1.getClientRMService();
-    rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
-
-    for (int i = 0; i < 30; i++) {
+    for (int i = 0; i < ZK_TIMEOUT_MS / 50; i++) {
       if (HAServiceProtocol.HAServiceState.ACTIVE ==
           rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) {
         Thread.sleep(100);