You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/12/03 06:24:15 UTC

[43/50] [abbrv] hadoop git commit: YARN-2136. Changed RMStateStore to ignore store opearations when fenced. Contributed by Varun Saxena

YARN-2136. Changed RMStateStore to ignore store opearations when fenced. Contributed by Varun Saxena


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52bcefca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52bcefca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52bcefca

Branch: refs/heads/HDFS-EC
Commit: 52bcefca8bb13d3757009f1f08203e7dca3b1e16
Parents: 26319ba
Author: Jian He <ji...@apache.org>
Authored: Tue Dec 2 10:53:55 2014 -0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Dec 2 10:54:48 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../resourcemanager/recovery/RMStateStore.java  | 62 +++++++++++--
 .../recovery/RMStateStoreEventType.java         |  3 +-
 .../recovery/ZKRMStateStore.java                |  8 ++
 .../recovery/TestZKRMStateStore.java            | 94 ++++++++++++++++++++
 5 files changed, 161 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d95679e..3744a1ecb 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -109,6 +109,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2907. SchedulerNode#toString should print all resource detail instead 
     of only memory. (Rohith via junping_du)
 
+    YARN-2136. Changed RMStateStore to ignore store opearations when fenced.
+    (Varun Saxena via jianhe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 35a54c3..00e1dfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -26,6 +27,7 @@ import java.util.TreeMap;
 
 import javax.crypto.SecretKey;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -88,7 +90,8 @@ public abstract class RMStateStore extends AbstractService {
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
   private enum RMStateStoreState {
-    DEFAULT
+    ACTIVE,
+    FENCED
   };
 
   private static final StateMachineFactory<RMStateStore,
@@ -99,17 +102,27 @@ public abstract class RMStateStore extends AbstractService {
                                                     RMStateStoreState,
                                                     RMStateStoreEventType,
                                                     RMStateStoreEvent>(
-      RMStateStoreState.DEFAULT)
-      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+      RMStateStoreState.ACTIVE)
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
           RMStateStoreEventType.STORE_APP, new StoreAppTransition())
-      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
           RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
-      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
           RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
-      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
           RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
-      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
-          RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+          RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition())
+      .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
+          RMStateStoreEventType.FENCED)
+      .addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
+          EnumSet.of(
+          RMStateStoreEventType.STORE_APP,
+          RMStateStoreEventType.UPDATE_APP,
+          RMStateStoreEventType.REMOVE_APP,
+          RMStateStoreEventType.STORE_APP_ATTEMPT,
+          RMStateStoreEventType.UPDATE_APP_ATTEMPT,
+          RMStateStoreEventType.FENCED));
 
   private final StateMachine<RMStateStoreState,
                              RMStateStoreEventType,
@@ -432,6 +445,11 @@ public abstract class RMStateStore extends AbstractService {
     dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
   }
 
+  public synchronized void updateFencedState() {
+    this.stateMachine.doTransition(RMStateStoreEventType.FENCED,
+         new RMStateStoreEvent(RMStateStoreEventType.FENCED));
+  }
+
   /**
    * Blocking API
    * Derived classes must implement this method to store the state of an 
@@ -494,6 +512,10 @@ public abstract class RMStateStore extends AbstractService {
   public synchronized void storeRMDelegationTokenAndSequenceNumber(
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
       int latestSequenceNumber) {
+    if(isFencedState()) {
+      LOG.info("State store is in Fenced state. Can't store RM Delegation Token.");
+      return;
+    }
     try {
       storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
           latestSequenceNumber);
@@ -516,6 +538,10 @@ public abstract class RMStateStore extends AbstractService {
    */
   public synchronized void removeRMDelegationToken(
       RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
+    if(isFencedState()) {
+      LOG.info("State store is in Fenced state. Can't remove RM Delegation Token.");
+      return;
+    }
     try {
       removeRMDelegationTokenState(rmDTIdentifier);
     } catch (Exception e) {
@@ -537,6 +563,10 @@ public abstract class RMStateStore extends AbstractService {
   public synchronized void updateRMDelegationTokenAndSequenceNumber(
       RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
       int latestSequenceNumber) {
+    if(isFencedState()) {
+      LOG.info("State store is in Fenced state. Can't update RM Delegation Token.");
+      return;
+    }
     try {
       updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate,
           latestSequenceNumber);
@@ -558,6 +588,11 @@ public abstract class RMStateStore extends AbstractService {
    * RMDTSecretManager call this to store the state of a master key
    */
   public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) {
+    if(isFencedState()) {
+      LOG.info("State store is in Fenced state. Can't store RM Delegation " +
+               "Token Master key.");
+      return;
+    }
     try {
       storeRMDTMasterKeyState(delegationKey);
     } catch (Exception e) {
@@ -577,6 +612,11 @@ public abstract class RMStateStore extends AbstractService {
    * RMDTSecretManager call this to remove the state of a master key
    */
   public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) {
+   if(isFencedState()) {
+      LOG.info("State store is in Fenced state. Can't remove RM Delegation " +
+               "Token Master key.");
+      return;
+    }
     try {
       removeRMDTMasterKeyState(delegationKey);
     } catch (Exception e) {
@@ -647,6 +687,11 @@ public abstract class RMStateStore extends AbstractService {
     }
     return credentials;
   }
+  
+  @VisibleForTesting
+  synchronized boolean isFencedState() {
+    return (RMStateStoreState.FENCED == this.stateMachine.getCurrentState());
+  }
 
   // Dispatcher related code
   protected void handleStoreEvent(RMStateStoreEvent event) {
@@ -665,6 +710,7 @@ public abstract class RMStateStore extends AbstractService {
    */
   protected void notifyStoreOperationFailed(Exception failureCause) {
     if (failureCause instanceof StoreFencedException) {
+      updateFencedState();
       Thread standByTransitionThread =
           new Thread(new StandByTransitionThread());
       standByTransitionThread.setName("StandByTransitionThread Handler");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
index 903f4e7..9301bf9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
@@ -23,5 +23,6 @@ public enum RMStateStoreEventType {
   STORE_APP,
   UPDATE_APP,
   UPDATE_APP_ATTEMPT,
-  REMOVE_APP
+  REMOVE_APP,
+  FENCED
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index a19ed30..a718fb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -1017,6 +1017,9 @@ public class ZKRMStateStore extends RMStateStore {
     public void run() {
       try {
         while (true) {
+          if(isFencedState()) { 
+            break;
+          }
           doMultiWithRetries(emptyOpList);
           Thread.sleep(zkSessionTimeout);
         }
@@ -1134,6 +1137,11 @@ public class ZKRMStateStore extends RMStateStore {
   public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
       AMRMTokenSecretManagerState amrmTokenSecretManagerState,
       boolean isUpdate) {
+    if(isFencedState()) {
+      LOG.info("State store is in Fenced state. Can't store/update " +
+               "AMRMToken Secret Manager state.");
+      return;
+    }	
     AMRMTokenSecretManagerState data =
         AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
     byte[] stateData = data.getProto().toByteArray();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 3c7170a..e936677 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -20,22 +20,40 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.List;
 
+import javax.crypto.SecretKey;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Test;
@@ -191,4 +209,80 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
         HAServiceProtocol.HAServiceState.ACTIVE,
         rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
   }
+  
+  @Test
+  public void testFencedState() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+	RMStateStore store = zkTester.getRMStateStore();
+   
+    // Move state to FENCED from ACTIVE
+    store.updateFencedState();
+    assertEquals("RMStateStore should have been in fenced state",
+            true, store.isFencedState());    
+
+    long submitTime = System.currentTimeMillis();
+    long startTime = submitTime + 1000;
+
+    // Add a new app
+    RMApp mockApp = mock(RMApp.class);
+    ApplicationSubmissionContext context =
+      new ApplicationSubmissionContextPBImpl();
+    when(mockApp.getSubmitTime()).thenReturn(submitTime);
+    when(mockApp.getStartTime()).thenReturn(startTime);
+    when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
+    when(mockApp.getUser()).thenReturn("test");
+    store.storeNewApplication(mockApp);
+    assertEquals("RMStateStore should have been in fenced state",
+            true, store.isFencedState());
+
+    // Add a new attempt
+    ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+            new ClientToAMTokenSecretManagerInRM();
+    ApplicationAttemptId attemptId = ConverterUtils
+            .toApplicationAttemptId("appattempt_1234567894321_0001_000001");
+    SecretKey clientTokenMasterKey =
+                clientToAMTokenMgr.createMasterKey(attemptId);
+    RMAppAttemptMetrics mockRmAppAttemptMetrics = 
+         mock(RMAppAttemptMetrics.class);
+    Container container = new ContainerPBImpl();
+    container.setId(ConverterUtils.toContainerId("container_1234567891234_0001_01_000001"));
+    RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
+    when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
+    when(mockAttempt.getMasterContainer()).thenReturn(container);
+    when(mockAttempt.getClientTokenMasterKey())
+        .thenReturn(clientTokenMasterKey);
+    when(mockAttempt.getRMAppAttemptMetrics())
+        .thenReturn(mockRmAppAttemptMetrics);
+    when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
+        .thenReturn(new AggregateAppResourceUsage(0,0));
+    store.storeNewApplicationAttempt(mockAttempt);
+    assertEquals("RMStateStore should have been in fenced state",
+            true, store.isFencedState());
+
+    long finishTime = submitTime + 1000;
+    // Update attempt
+    ApplicationAttemptStateData newAttemptState =
+      ApplicationAttemptStateData.newInstance(attemptId, container,
+            store.getCredentialsFromAppAttempt(mockAttempt),
+            startTime, RMAppAttemptState.FINISHED, "testUrl", 
+            "test", FinalApplicationStatus.SUCCEEDED, 100, 
+            finishTime, 0, 0);
+    store.updateApplicationAttemptState(newAttemptState);
+    assertEquals("RMStateStore should have been in fenced state",
+            true, store.isFencedState());
+
+    // Update app
+    ApplicationStateData appState = ApplicationStateData.newInstance(submitTime, 
+            startTime, context, "test");
+    store.updateApplicationState(appState);
+    assertEquals("RMStateStore should have been in fenced state",
+            true, store.isFencedState());
+
+    // Remove app
+    store.removeApplication(mockApp);
+    assertEquals("RMStateStore should have been in fenced state",
+            true, store.isFencedState());
+ 
+    store.close();
+  }
 }