You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/12/03 06:24:15 UTC
[43/50] [abbrv] hadoop git commit: YARN-2136. Changed RMStateStore to
ignore store opearations when fenced. Contributed by Varun Saxena
YARN-2136. Changed RMStateStore to ignore store opearations when fenced. Contributed by Varun Saxena
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52bcefca
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52bcefca
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52bcefca
Branch: refs/heads/HDFS-EC
Commit: 52bcefca8bb13d3757009f1f08203e7dca3b1e16
Parents: 26319ba
Author: Jian He <ji...@apache.org>
Authored: Tue Dec 2 10:53:55 2014 -0800
Committer: Jian He <ji...@apache.org>
Committed: Tue Dec 2 10:54:48 2014 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../resourcemanager/recovery/RMStateStore.java | 62 +++++++++++--
.../recovery/RMStateStoreEventType.java | 3 +-
.../recovery/ZKRMStateStore.java | 8 ++
.../recovery/TestZKRMStateStore.java | 94 ++++++++++++++++++++
5 files changed, 161 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d95679e..3744a1ecb 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -109,6 +109,9 @@ Release 2.7.0 - UNRELEASED
YARN-2907. SchedulerNode#toString should print all resource detail instead
of only memory. (Rohith via junping_du)
+ YARN-2136. Changed RMStateStore to ignore store opearations when fenced.
+ (Varun Saxena via jianhe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
index 35a54c3..00e1dfc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -26,6 +27,7 @@ import java.util.TreeMap;
import javax.crypto.SecretKey;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -88,7 +90,8 @@ public abstract class RMStateStore extends AbstractService {
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
private enum RMStateStoreState {
- DEFAULT
+ ACTIVE,
+ FENCED
};
private static final StateMachineFactory<RMStateStore,
@@ -99,17 +102,27 @@ public abstract class RMStateStore extends AbstractService {
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>(
- RMStateStoreState.DEFAULT)
- .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ RMStateStoreState.ACTIVE)
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
- .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
- .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
- .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
- .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
- RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.ACTIVE,
+ RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition())
+ .addTransition(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED,
+ RMStateStoreEventType.FENCED)
+ .addTransition(RMStateStoreState.FENCED, RMStateStoreState.FENCED,
+ EnumSet.of(
+ RMStateStoreEventType.STORE_APP,
+ RMStateStoreEventType.UPDATE_APP,
+ RMStateStoreEventType.REMOVE_APP,
+ RMStateStoreEventType.STORE_APP_ATTEMPT,
+ RMStateStoreEventType.UPDATE_APP_ATTEMPT,
+ RMStateStoreEventType.FENCED));
private final StateMachine<RMStateStoreState,
RMStateStoreEventType,
@@ -432,6 +445,11 @@ public abstract class RMStateStore extends AbstractService {
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
}
+ public synchronized void updateFencedState() {
+ this.stateMachine.doTransition(RMStateStoreEventType.FENCED,
+ new RMStateStoreEvent(RMStateStoreEventType.FENCED));
+ }
+
/**
* Blocking API
* Derived classes must implement this method to store the state of an
@@ -494,6 +512,10 @@ public abstract class RMStateStore extends AbstractService {
public synchronized void storeRMDelegationTokenAndSequenceNumber(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) {
+ if(isFencedState()) {
+ LOG.info("State store is in Fenced state. Can't store RM Delegation Token.");
+ return;
+ }
try {
storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
latestSequenceNumber);
@@ -516,6 +538,10 @@ public abstract class RMStateStore extends AbstractService {
*/
public synchronized void removeRMDelegationToken(
RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber) {
+ if(isFencedState()) {
+ LOG.info("State store is in Fenced state. Can't remove RM Delegation Token.");
+ return;
+ }
try {
removeRMDelegationTokenState(rmDTIdentifier);
} catch (Exception e) {
@@ -537,6 +563,10 @@ public abstract class RMStateStore extends AbstractService {
public synchronized void updateRMDelegationTokenAndSequenceNumber(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
int latestSequenceNumber) {
+ if(isFencedState()) {
+ LOG.info("State store is in Fenced state. Can't update RM Delegation Token.");
+ return;
+ }
try {
updateRMDelegationTokenAndSequenceNumberInternal(rmDTIdentifier, renewDate,
latestSequenceNumber);
@@ -558,6 +588,11 @@ public abstract class RMStateStore extends AbstractService {
* RMDTSecretManager call this to store the state of a master key
*/
public synchronized void storeRMDTMasterKey(DelegationKey delegationKey) {
+ if(isFencedState()) {
+ LOG.info("State store is in Fenced state. Can't store RM Delegation " +
+ "Token Master key.");
+ return;
+ }
try {
storeRMDTMasterKeyState(delegationKey);
} catch (Exception e) {
@@ -577,6 +612,11 @@ public abstract class RMStateStore extends AbstractService {
* RMDTSecretManager call this to remove the state of a master key
*/
public synchronized void removeRMDTMasterKey(DelegationKey delegationKey) {
+ if(isFencedState()) {
+ LOG.info("State store is in Fenced state. Can't remove RM Delegation " +
+ "Token Master key.");
+ return;
+ }
try {
removeRMDTMasterKeyState(delegationKey);
} catch (Exception e) {
@@ -647,6 +687,11 @@ public abstract class RMStateStore extends AbstractService {
}
return credentials;
}
+
+ @VisibleForTesting
+ synchronized boolean isFencedState() {
+ return (RMStateStoreState.FENCED == this.stateMachine.getCurrentState());
+ }
// Dispatcher related code
protected void handleStoreEvent(RMStateStoreEvent event) {
@@ -665,6 +710,7 @@ public abstract class RMStateStore extends AbstractService {
*/
protected void notifyStoreOperationFailed(Exception failureCause) {
if (failureCause instanceof StoreFencedException) {
+ updateFencedState();
Thread standByTransitionThread =
new Thread(new StandByTransitionThread());
standByTransitionThread.setName("StandByTransitionThread Handler");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
index 903f4e7..9301bf9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreEventType.java
@@ -23,5 +23,6 @@ public enum RMStateStoreEventType {
STORE_APP,
UPDATE_APP,
UPDATE_APP_ATTEMPT,
- REMOVE_APP
+ REMOVE_APP,
+ FENCED
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index a19ed30..a718fb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -1017,6 +1017,9 @@ public class ZKRMStateStore extends RMStateStore {
public void run() {
try {
while (true) {
+ if(isFencedState()) {
+ break;
+ }
doMultiWithRetries(emptyOpList);
Thread.sleep(zkSessionTimeout);
}
@@ -1134,6 +1137,11 @@ public class ZKRMStateStore extends RMStateStore {
public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
boolean isUpdate) {
+ if(isFencedState()) {
+ LOG.info("State store is in Fenced state. Can't store/update " +
+ "AMRMToken Secret Manager state.");
+ return;
+ }
AMRMTokenSecretManagerState data =
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
byte[] stateData = data.getProto().toByteArray();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/52bcefca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
index 3c7170a..e936677 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java
@@ -20,22 +20,40 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
+import javax.crypto.SecretKey;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
@@ -191,4 +209,80 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
}
+
+ @Test
+ public void testFencedState() throws Exception {
+ TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+ RMStateStore store = zkTester.getRMStateStore();
+
+ // Move state to FENCED from ACTIVE
+ store.updateFencedState();
+ assertEquals("RMStateStore should have been in fenced state",
+ true, store.isFencedState());
+
+ long submitTime = System.currentTimeMillis();
+ long startTime = submitTime + 1000;
+
+ // Add a new app
+ RMApp mockApp = mock(RMApp.class);
+ ApplicationSubmissionContext context =
+ new ApplicationSubmissionContextPBImpl();
+ when(mockApp.getSubmitTime()).thenReturn(submitTime);
+ when(mockApp.getStartTime()).thenReturn(startTime);
+ when(mockApp.getApplicationSubmissionContext()).thenReturn(context);
+ when(mockApp.getUser()).thenReturn("test");
+ store.storeNewApplication(mockApp);
+ assertEquals("RMStateStore should have been in fenced state",
+ true, store.isFencedState());
+
+ // Add a new attempt
+ ClientToAMTokenSecretManagerInRM clientToAMTokenMgr =
+ new ClientToAMTokenSecretManagerInRM();
+ ApplicationAttemptId attemptId = ConverterUtils
+ .toApplicationAttemptId("appattempt_1234567894321_0001_000001");
+ SecretKey clientTokenMasterKey =
+ clientToAMTokenMgr.createMasterKey(attemptId);
+ RMAppAttemptMetrics mockRmAppAttemptMetrics =
+ mock(RMAppAttemptMetrics.class);
+ Container container = new ContainerPBImpl();
+ container.setId(ConverterUtils.toContainerId("container_1234567891234_0001_01_000001"));
+ RMAppAttempt mockAttempt = mock(RMAppAttempt.class);
+ when(mockAttempt.getAppAttemptId()).thenReturn(attemptId);
+ when(mockAttempt.getMasterContainer()).thenReturn(container);
+ when(mockAttempt.getClientTokenMasterKey())
+ .thenReturn(clientTokenMasterKey);
+ when(mockAttempt.getRMAppAttemptMetrics())
+ .thenReturn(mockRmAppAttemptMetrics);
+ when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
+ .thenReturn(new AggregateAppResourceUsage(0,0));
+ store.storeNewApplicationAttempt(mockAttempt);
+ assertEquals("RMStateStore should have been in fenced state",
+ true, store.isFencedState());
+
+ long finishTime = submitTime + 1000;
+ // Update attempt
+ ApplicationAttemptStateData newAttemptState =
+ ApplicationAttemptStateData.newInstance(attemptId, container,
+ store.getCredentialsFromAppAttempt(mockAttempt),
+ startTime, RMAppAttemptState.FINISHED, "testUrl",
+ "test", FinalApplicationStatus.SUCCEEDED, 100,
+ finishTime, 0, 0);
+ store.updateApplicationAttemptState(newAttemptState);
+ assertEquals("RMStateStore should have been in fenced state",
+ true, store.isFencedState());
+
+ // Update app
+ ApplicationStateData appState = ApplicationStateData.newInstance(submitTime,
+ startTime, context, "test");
+ store.updateApplicationState(appState);
+ assertEquals("RMStateStore should have been in fenced state",
+ true, store.isFencedState());
+
+ // Remove app
+ store.removeApplication(mockApp);
+ assertEquals("RMStateStore should have been in fenced state",
+ true, store.isFencedState());
+
+ store.close();
+ }
}