You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/05/30 06:14:27 UTC
svn commit: r1487720 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-reso...
Author: vinodkv
Date: Thu May 30 04:14:26 2013
New Revision: 1487720
URL: http://svn.apache.org/r1487720
Log:
YARN-638. Modified ResourceManager to restore RMDelegationTokens after restarting. Contributed by Jian He.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu May 30 04:14:26 2013
@@ -224,6 +224,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-714. Added NMTokens to be sent to AMs as part of heart-beat response.
(Omkar Vinit Joshi via vinodkv)
+ YARN-638. Modified ResourceManager to restore RMDelegationTokens after
+ restarting. (Jian He via vinodkv)
+
OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Thu May 30 04:14:26 2013
@@ -239,7 +239,7 @@ public class ResourceManager extends Com
// Register event handler for RMAppManagerEvents
this.rmDispatcher.register(RMAppManagerEventType.class,
this.rmAppManager);
- this.rmDTSecretManager = createRMDelegationTokenSecretManager();
+ this.rmDTSecretManager = createRMDelegationTokenSecretManager(this.rmContext);
clientRM = createClientRMService();
addService(clientRM);
@@ -666,7 +666,7 @@ public class ResourceManager extends Com
}
protected RMDelegationTokenSecretManager
- createRMDelegationTokenSecretManager() {
+ createRMDelegationTokenSecretManager(RMContext rmContext) {
long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@@ -678,7 +678,7 @@ public class ResourceManager extends Com
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
return new RMDelegationTokenSecretManager(secretKeyInterval,
- tokenMaxLifetime, tokenRenewInterval, 3600000);
+ tokenMaxLifetime, tokenRenewInterval, 3600000, rmContext);
}
protected ClientRMService createClientRMService() {
@@ -745,6 +745,9 @@ public class ResourceManager extends Com
@Override
public void recover(RMState state) throws Exception {
+ // recover RMdelegationTokenSecretManager
+ rmDTSecretManager.recover(state);
+
// recover applications
rmAppManager.recover(state);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Thu May 30 04:14:26 2013
@@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.List;
@@ -33,11 +37,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -57,11 +63,19 @@ public class FileSystemRMStateStore exte
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
private static final String ROOT_DIR_NAME = "FSRMStateRoot";
-
+ private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
+ private static final String RM_APP_ROOT = "RMAppRoot";
+ private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
+ private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
+ private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
+ "RMDTSequenceNumber_";
private FileSystem fs;
- private Path fsRootDirPath;
+ private Path rootDirPath;
+ private Path rmDTSecretManagerRoot;
+ private Path rmAppRoot;
+ private Path dtSequenceNumberPath = null;
@VisibleForTesting
Path fsWorkingPath;
@@ -70,11 +84,14 @@ public class FileSystemRMStateStore exte
throws Exception{
fsWorkingPath = new Path(conf.get(YarnConfiguration.FS_RM_STATE_STORE_URI));
- fsRootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
+ rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
+ rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
// create filesystem
fs = fsWorkingPath.getFileSystem(conf);
- fs.mkdirs(fsRootDirPath);
+ fs.mkdirs(rmDTSecretManagerRoot);
+ fs.mkdirs(rmAppRoot);
}
@Override
@@ -84,15 +101,23 @@ public class FileSystemRMStateStore exte
@Override
public synchronized RMState loadState() throws Exception {
+ RMState rmState = new RMState();
+ // recover DelegationTokenSecretManager
+ loadRMDTSecretManagerState(rmState);
+ // recover RM applications
+ loadRMAppState(rmState);
+ return rmState;
+ }
+
+ private void loadRMAppState(RMState rmState) throws Exception {
try {
- RMState state = new RMState();
- FileStatus[] childNodes = fs.listStatus(fsRootDirPath);
+ FileStatus[] childNodes = fs.listStatus(rmAppRoot);
List<ApplicationAttemptState> attempts =
new ArrayList<ApplicationAttemptState>();
for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
- Path childNodePath = getNodePath(childNodeName);
+ Path childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
// application
@@ -107,7 +132,7 @@ public class FileSystemRMStateStore exte
appStateData.getUser());
// assert child node name is same as actual applicationId
assert appId.equals(appState.context.getApplicationId());
- state.appState.put(appId, appState);
+ rmState.appState.put(appId, appState);
} else if(childNodeName.startsWith(
ApplicationAttemptId.appAttemptIdStrPrefix)) {
// attempt
@@ -139,7 +164,7 @@ public class FileSystemRMStateStore exte
// go through all attempts and add them to their apps
for(ApplicationAttemptState attemptState : attempts) {
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
- ApplicationState appState = state.appState.get(appId);
+ ApplicationState appState = rmState.appState.get(appId);
if(appState != null) {
appState.attempts.put(attemptState.getAttemptId(), attemptState);
} else {
@@ -148,22 +173,49 @@ public class FileSystemRMStateStore exte
// application attempt nodes
LOG.info("Application node not found for attempt: "
+ attemptState.getAttemptId());
- deleteFile(getNodePath(attemptState.getAttemptId().toString()));
+ deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString()));
}
}
-
- return state;
} catch (Exception e) {
LOG.error("Failed to load state.", e);
throw e;
}
}
+ private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
+ FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
+
+ for(FileStatus childNodeStatus : childNodes) {
+ assert childNodeStatus.isFile();
+ String childNodeName = childNodeStatus.getPath().getName();
+ Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
+ byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+ ByteArrayInputStream is = new ByteArrayInputStream(childData);
+ DataInputStream fsIn = new DataInputStream(is);
+ if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){
+ DelegationKey key = new DelegationKey();
+ key.readFields(fsIn);
+ rmState.rmSecretManagerState.masterKeyState.add(key);
+ } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+ RMDelegationTokenIdentifier identifier = new RMDelegationTokenIdentifier();
+ identifier.readFields(fsIn);
+ long renewDate = fsIn.readLong();
+ rmState.rmSecretManagerState.delegationTokenState.put(identifier,
+ renewDate);
+ } else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
+ rmState.rmSecretManagerState.dtSequenceNumber =
+ Integer.parseInt(childNodeName.split("_")[1]);
+ }else {
+ LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
+ }
+ fsIn.close();
+ }
+ }
+
@Override
public synchronized void storeApplicationState(String appId,
- ApplicationStateDataPBImpl appStateDataPB)
- throws Exception {
- Path nodeCreatePath = getNodePath(appId);
+ ApplicationStateDataPBImpl appStateDataPB) throws Exception {
+ Path nodeCreatePath = getNodePath(rmAppRoot, appId);
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -179,9 +231,8 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationAttemptState(String attemptId,
- ApplicationAttemptStateDataPBImpl attemptStateDataPB)
- throws Exception {
- Path nodeCreatePath = getNodePath(attemptId);
+ ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
+ Path nodeCreatePath = getNodePath(rmAppRoot, attemptId);
LOG.info("Storing info for attempt: " + attemptId
+ " at: " + nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -197,9 +248,9 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void removeApplicationState(ApplicationState appState)
- throws Exception {
+ throws Exception {
String appId = appState.getAppId().toString();
- Path nodeRemovePath = getNodePath(appId);
+ Path nodeRemovePath = getNodePath(rmAppRoot, appId);
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
@@ -208,13 +259,76 @@ public class FileSystemRMStateStore exte
}
public synchronized void removeApplicationAttemptState(String attemptId)
- throws Exception {
- Path nodeRemovePath = getNodePath(attemptId);
+ throws Exception {
+ Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
LOG.info("Removing info for attempt: " + attemptId
+ " at: " + nodeRemovePath);
deleteFile(nodeRemovePath);
}
+ @Override
+ public synchronized void storeRMDelegationTokenAndSequenceNumberState(
+ RMDelegationTokenIdentifier identifier, Long renewDate,
+ int latestSequenceNumber) throws Exception {
+ Path nodeCreatePath =
+ getNodePath(rmDTSecretManagerRoot,
+ DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream fsOut = new DataOutputStream(os);
+ LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
+ identifier.write(fsOut);
+ fsOut.writeLong(renewDate);
+ writeFile(nodeCreatePath, os.toByteArray());
+ fsOut.close();
+
+ // store sequence number
+ Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
+ DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
+ LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
+ + latestSequenceNumber);
+ if (dtSequenceNumberPath == null) {
+ if (!createFile(latestSequenceNumberPath)) {
+ throw new Exception("Failed to create " + latestSequenceNumberPath);
+ }
+ } else {
+ if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
+ throw new Exception("Failed to rename " + dtSequenceNumberPath);
+ }
+ }
+ dtSequenceNumberPath = latestSequenceNumberPath;
+ }
+
+ @Override
+ public synchronized void removeRMDelegationTokenState(
+ RMDelegationTokenIdentifier identifier) throws Exception {
+ Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
+ DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
+ LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
+ deleteFile(nodeCreatePath);
+ }
+
+ @Override
+ public synchronized void storeRMDTMasterKeyState(DelegationKey masterKey)
+ throws Exception {
+ Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
+ DELEGATION_KEY_PREFIX + masterKey.getKeyId());
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream fsOut = new DataOutputStream(os);
+ LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
+ masterKey.write(fsOut);
+ writeFile(nodeCreatePath, os.toByteArray());
+ fsOut.close();
+ }
+
+ @Override
+ public synchronized void
+ removeRMDTMasterKeyState(DelegationKey masterKey) throws Exception {
+ Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
+ DELEGATION_KEY_PREFIX + masterKey.getKeyId());
+ LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
+ deleteFile(nodeCreatePath);
+ }
+
// FileSystem related code
private void deleteFile(Path deletePath) throws Exception {
@@ -228,18 +342,25 @@ public class FileSystemRMStateStore exte
// state data will not be that "long"
byte[] data = new byte[(int)len];
fsIn.readFully(data);
+ fsIn.close();
return data;
}
private void writeFile(Path outputPath, byte[] data) throws Exception {
FSDataOutputStream fsOut = fs.create(outputPath, false);
fsOut.write(data);
- fsOut.flush();
fsOut.close();
}
- @VisibleForTesting
- Path getNodePath(String nodeName) {
- return new Path(fsRootDirPath, nodeName);
+ private boolean renameFile(Path src, Path dst) throws Exception {
+ return fs.rename(src, dst);
+ }
+
+ private boolean createFile(Path newFile) throws Exception {
+ return fs.createNewFile(newFile);
+ }
+
+ private Path getNodePath(Path root, String nodeName) {
+ return new Path(root, nodeName);
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Thu May 30 04:14:26 2013
@@ -19,14 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -49,6 +53,12 @@ public class MemoryRMStateStore extends
// return a copy of the state to allow for modification of the real state
RMState returnState = new RMState();
returnState.appState.putAll(state.appState);
+ returnState.rmSecretManagerState.getMasterKeyState()
+ .addAll(state.rmSecretManagerState.getMasterKeyState());
+ returnState.rmSecretManagerState.getTokenState().putAll(
+ state.rmSecretManagerState.getTokenState());
+ returnState.rmSecretManagerState.dtSequenceNumber =
+ state.rmSecretManagerState.dtSequenceNumber;
return returnState;
}
@@ -113,4 +123,53 @@ public class MemoryRMStateStore extends
ApplicationState removed = state.appState.remove(appId);
assert removed != null;
}
+
+ @Override
+ public synchronized void storeRMDelegationTokenAndSequenceNumberState(
+ RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+ int latestSequenceNumber) throws Exception {
+ Map<RMDelegationTokenIdentifier, Long> rmDTState =
+ state.rmSecretManagerState.getTokenState();
+ if (rmDTState.containsKey(rmDTIdentifier)) {
+ IOException e = new IOException("RMDelegationToken: " + rmDTIdentifier
+ + "is already stored.");
+ LOG.info("Error storing info for RMDelegationToken: " + rmDTIdentifier, e);
+ throw e;
+ }
+ rmDTState.put(rmDTIdentifier, renewDate);
+ state.rmSecretManagerState.dtSequenceNumber = latestSequenceNumber;
+ }
+
+ @Override
+ public synchronized void removeRMDelegationTokenState(
+ RMDelegationTokenIdentifier rmDTIdentifier) throws Exception{
+ Map<RMDelegationTokenIdentifier, Long> rmDTState =
+ state.rmSecretManagerState.getTokenState();
+ rmDTState.remove(rmDTIdentifier);
+ }
+
+ @Override
+ public synchronized void storeRMDTMasterKeyState(DelegationKey delegationKey)
+ throws Exception {
+ Set<DelegationKey> rmDTMasterKeyState =
+ state.rmSecretManagerState.getMasterKeyState();
+
+ if (rmDTMasterKeyState.contains(delegationKey)) {
+ IOException e = new IOException("RMDTMasterKey with keyID: "
+ + delegationKey.getKeyId() + " is already stored");
+ LOG.info("Error storing info for RMDTMasterKey with keyID: "
+ + delegationKey.getKeyId(), e);
+ throw e;
+ }
+ state.getRMDTSecretManagerState().getMasterKeyState().add(delegationKey);
+ LOG.info("rmDTMasterKeyState SIZE: " + rmDTMasterKeyState.size());
+ }
+
+ @Override
+ public synchronized void removeRMDTMasterKeyState(DelegationKey delegationKey)
+ throws Exception {
+ Set<DelegationKey> rmDTMasterKeyState =
+ state.rmSecretManagerState.getMasterKeyState();
+ rmDTMasterKeyState.remove(delegationKey);
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Thu May 30 04:14:26 2013
@@ -18,10 +18,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@Unstable
public class NullRMStateStore extends RMStateStore {
@@ -59,4 +62,26 @@ public class NullRMStateStore extends RM
// Do nothing
}
+ @Override
+ public void storeRMDelegationTokenAndSequenceNumberState(
+ RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+ int latestSequenceNumber) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void removeRMDelegationTokenState(RMDelegationTokenIdentifier rmDTIdentifier)
+ throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void storeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
+ // Do nothing
+ }
+
+ @Override
+ public void removeRMDTMasterKeyState(DelegationKey delegationKey) throws Exception {
+ // Do nothing
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Thu May 30 04:14:26 2013
@@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.re
import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -30,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.security.A
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppStoredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -57,7 +61,7 @@ import org.apache.hadoop.yarn.server.res
*/
public abstract class RMStateStore {
public static final Log LOG = LogFactory.getLog(RMStateStore.class);
-
+
/**
* State of an application attempt
*/
@@ -121,17 +125,46 @@ public abstract class RMStateStore {
return user;
}
}
-
+
+ public static class RMDTSecretManagerState {
+ // DTIdentifier -> renewDate
+ Map<RMDelegationTokenIdentifier, Long> delegationTokenState =
+ new HashMap<RMDelegationTokenIdentifier, Long>();
+
+ Set<DelegationKey> masterKeyState =
+ new HashSet<DelegationKey>();
+
+ int dtSequenceNumber = 0;
+
+ public Map<RMDelegationTokenIdentifier, Long> getTokenState() {
+ return delegationTokenState;
+ }
+
+ public Set<DelegationKey> getMasterKeyState() {
+ return masterKeyState;
+ }
+
+ public int getDTSequenceNumber() {
+ return dtSequenceNumber;
+ }
+ }
+
/**
* State of the ResourceManager
*/
public static class RMState {
- Map<ApplicationId, ApplicationState> appState =
- new HashMap<ApplicationId, ApplicationState>();
-
+ Map<ApplicationId, ApplicationState> appState =
+ new HashMap<ApplicationId, ApplicationState>();
+
+ RMDTSecretManagerState rmSecretManagerState = new RMDTSecretManagerState();
+
public Map<ApplicationId, ApplicationState> getApplicationState() {
return appState;
}
+
+ public RMDTSecretManagerState getRMDTSecretManagerState() {
+ return rmSecretManagerState;
+ }
}
private Dispatcher rmDispatcher;
@@ -235,8 +268,76 @@ public abstract class RMStateStore {
protected abstract void storeApplicationAttemptState(String attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception;
-
-
+
+
+ /**
+ * RMDTSecretManager call this to store the state of a delegation token
+ * and sequence number
+ */
+ public synchronized void storeRMDelegationTokenAndSequenceNumber(
+ RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+ int latestSequenceNumber) throws Exception {
+ storeRMDelegationTokenAndSequenceNumberState(rmDTIdentifier, renewDate,
+ latestSequenceNumber);
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to store the state of
+ * RMDelegationToken and sequence number
+ */
+ protected abstract void storeRMDelegationTokenAndSequenceNumberState(
+ RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+ int latestSequenceNumber) throws Exception;
+
+ /**
+ * RMDTSecretManager call this to remove the state of a delegation token
+ */
+ public synchronized void removeRMDelegationToken(
+ RMDelegationTokenIdentifier rmDTIdentifier, int sequenceNumber)
+ throws Exception {
+ removeRMDelegationTokenState(rmDTIdentifier);
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to remove the state of RMDelegationToken
+ */
+ protected abstract void removeRMDelegationTokenState(
+ RMDelegationTokenIdentifier rmDTIdentifier) throws Exception;
+
+ /**
+ * RMDTSecretManager call this to store the state of a master key
+ */
+ public synchronized void storeRMDTMasterKey(DelegationKey delegationKey)
+ throws Exception {
+ storeRMDTMasterKeyState(delegationKey);
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to store the state of
+ * DelegationToken Master Key
+ */
+ protected abstract void storeRMDTMasterKeyState(DelegationKey delegationKey)
+ throws Exception;
+
+ /**
+ * RMDTSecretManager call this to remove the state of a master key
+ */
+ public synchronized void removeRMDTMasterKey(DelegationKey delegationKey)
+ throws Exception {
+ removeRMDTMasterKeyState(delegationKey);
+ }
+
+ /**
+ * Blocking API
+ * Derived classes must implement this method to remove the state of
+ * DelegationToken Master Key
+ */
+ protected abstract void removeRMDTMasterKeyState(DelegationKey delegationKey)
+ throws Exception;
+
/**
* Non-blocking API
* ResourceManager services call this to remove an application from the state
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Thu May 30 04:14:26 2013
@@ -18,10 +18,26 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* A ResourceManager specific delegation token secret manager.
@@ -30,8 +46,13 @@ import org.apache.hadoop.yarn.security.c
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class RMDelegationTokenSecretManager
- extends AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> {
+public class RMDelegationTokenSecretManager extends
+ AbstractDelegationTokenSecretManager<RMDelegationTokenIdentifier> implements
+ Recoverable {
+ private static final Log LOG = LogFactory
+ .getLog(RMDelegationTokenSecretManager.class);
+
+ protected final RMContext rmContext;
/**
* Create a secret manager
@@ -46,13 +67,132 @@ public class RMDelegationTokenSecretMana
public RMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
- long delegationTokenRemoverScanInterval) {
+ long delegationTokenRemoverScanInterval,
+ RMContext rmContext) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+ this.rmContext = rmContext;
}
@Override
public RMDelegationTokenIdentifier createIdentifier() {
return new RMDelegationTokenIdentifier();
}
+
+ @Override
+ protected void storeNewMasterKey(DelegationKey newKey) {
+ try {
+ LOG.info("storing master key with keyID " + newKey.getKeyId());
+ rmContext.getStateStore().storeRMDTMasterKey(newKey);
+ } catch (Exception e) {
+ LOG.error("Error in storing master key with KeyID: " + newKey.getKeyId());
+ ExitUtil.terminate(1, e);
+ }
+ }
+
+ @Override
+ protected void removeStoredMasterKey(DelegationKey key) {
+ try {
+ LOG.info("removing master key with keyID " + key.getKeyId());
+ rmContext.getStateStore().removeRMDTMasterKey(key);
+ } catch (Exception e) {
+ LOG.error("Error in removing master key with KeyID: " + key.getKeyId());
+ ExitUtil.terminate(1, e);
+ }
+ }
+
+ @Override
+ protected void storeNewToken(RMDelegationTokenIdentifier identifier,
+ long renewDate) {
+ try {
+ LOG.info("storing RMDelegation token with sequence number: "
+ + identifier.getSequenceNumber());
+ rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(
+ identifier, renewDate, identifier.getSequenceNumber());
+ } catch (Exception e) {
+ LOG.error("Error in storing RMDelegationToken with sequence number: "
+ + identifier.getSequenceNumber());
+ ExitUtil.terminate(1, e);
+ }
+ }
+
+ @Override
+ protected void updateStoredToken(RMDelegationTokenIdentifier id,
+ long renewDate) {
+ try {
+ LOG.info("updating RMDelegation token with sequence number: "
+ + id.getSequenceNumber());
+ rmContext.getStateStore().removeRMDelegationToken(id,
+ delegationTokenSequenceNumber);
+ rmContext.getStateStore().storeRMDelegationTokenAndSequenceNumber(id,
+ renewDate, id.getSequenceNumber());
+ } catch (Exception e) {
+ LOG.error("Error in updating persisted RMDelegationToken with sequence number: "
+ + id.getSequenceNumber());
+ ExitUtil.terminate(1, e);
+ }
+ }
+
+ @Override
+ protected void removeStoredToken(RMDelegationTokenIdentifier ident)
+ throws IOException {
+ try {
+ LOG.info("removing RMDelegation token with sequence number: "
+ + ident.getSequenceNumber());
+ rmContext.getStateStore().removeRMDelegationToken(ident,
+ delegationTokenSequenceNumber);
+ } catch (Exception e) {
+ LOG.error("Error in removing RMDelegationToken with sequence number: "
+ + ident.getSequenceNumber());
+ ExitUtil.terminate(1, e);
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public synchronized Set<DelegationKey> getAllMasterKeys() {
+ HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
+ keySet.addAll(allKeys.values());
+ return keySet;
+ }
+
+ @Private
+ @VisibleForTesting
+ public synchronized Map<RMDelegationTokenIdentifier, Long> getAllTokens() {
+ Map<RMDelegationTokenIdentifier, Long> allTokens =
+ new HashMap<RMDelegationTokenIdentifier, Long>();
+
+ for (Map.Entry<RMDelegationTokenIdentifier,
+ DelegationTokenInformation> entry : currentTokens.entrySet()) {
+ allTokens.put(entry.getKey(), entry.getValue().getRenewDate());
+ }
+ return allTokens;
+ }
+
+ @Private
+ @VisibleForTesting
+ public int getLatestDTSequenceNumber() {
+ return delegationTokenSequenceNumber;
+ }
+
+ @Override
+ public void recover(RMState rmState) throws Exception {
+
+ LOG.info("recovering RMDelegationTokenSecretManager.");
+ // recover RMDTMasterKeys
+ for (DelegationKey dtKey : rmState.getRMDTSecretManagerState()
+ .getMasterKeyState()) {
+ addKey(dtKey);
+ }
+
+ // recover RMDelegationTokens
+ Map<RMDelegationTokenIdentifier, Long> rmDelegationTokens =
+ rmState.getRMDTSecretManagerState().getTokenState();
+ this.delegationTokenSequenceNumber =
+ rmState.getRMDTSecretManagerState().getDTSequenceNumber();
+ for (Map.Entry<RMDelegationTokenIdentifier, Long> entry : rmDelegationTokens
+ .entrySet()) {
+ addPersistedDelegationToken(entry.getKey(), entry.getValue());
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Thu May 30 04:14:26 2013
@@ -291,7 +291,7 @@ public class MockRM extends ResourceMana
@Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
- rmAppManager, applicationACLsManager, null) {
+ rmAppManager, applicationACLsManager, rmDTSecretManager) {
@Override
public void start() {
// override to not start rpc handler
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java Thu May 30 04:14:26 2013
@@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.factories.
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -90,7 +91,9 @@ public class TestClientRMService {
@BeforeClass
public static void setupSecretManager() throws IOException {
- dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000);
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
+ dtsm = new RMDelegationTokenSecretManager(60000, 60000, 60000, 60000, rmContext);
dtsm.startThreads();
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMTokens.java Thu May 30 04:14:26 2013
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetAddress;
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@@ -454,10 +456,15 @@ public class TestClientRMTokens {
return mockSched;
}
- private static RMDelegationTokenSecretManager createRMDelegationTokenSecretManager(
- long secretKeyInterval, long tokenMaxLifetime, long tokenRenewInterval) {
- RMDelegationTokenSecretManager rmDtSecretManager = new RMDelegationTokenSecretManager(
- secretKeyInterval, tokenMaxLifetime, tokenRenewInterval, 3600000);
+ private static RMDelegationTokenSecretManager
+ createRMDelegationTokenSecretManager(long secretKeyInterval,
+ long tokenMaxLifetime, long tokenRenewInterval) {
+ RMContext rmContext = mock(RMContext.class);
+ when(rmContext.getStateStore()).thenReturn(new NullRMStateStore());
+
+ RMDelegationTokenSecretManager rmDtSecretManager =
+ new RMDelegationTokenSecretManager(secretKeyInterval, tokenMaxLifetime,
+ tokenRenewInterval, 3600000, rmContext);
return rmDtSecretManager;
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu May 30 04:14:26 2013
@@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -25,6 +28,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -33,14 +37,18 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -58,24 +66,32 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
public class TestRMRestart {
-
- @Test
- public void testRMRestart() throws Exception {
+
+ private YarnConfiguration conf;
+
+ @Before
+ public void setup() {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
ExitUtil.disableSystemExit();
-
- YarnConfiguration conf = new YarnConfiguration();
+ conf = new YarnConfiguration();
+ UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
+ }
+
+ @Test
+ public void testRMRestart() throws Exception {
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -331,13 +347,6 @@ public class TestRMRestart {
@Test
public void testRMRestartOnMaxAppAttempts() throws Exception {
- Logger rootLogger = LogManager.getRootLogger();
- rootLogger.setLevel(Level.DEBUG);
- ExitUtil.disableSystemExit();
-
- YarnConfiguration conf = new YarnConfiguration();
- conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
- conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -411,13 +420,6 @@ public class TestRMRestart {
@Test
public void testDelegationTokenRestoredInDelegationTokenRenewer()
throws Exception {
- Logger rootLogger = LogManager.getRootLogger();
- rootLogger.setLevel(Level.DEBUG);
- ExitUtil.disableSystemExit();
-
- YarnConfiguration conf = new YarnConfiguration();
- conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
- conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
@@ -496,13 +498,6 @@ public class TestRMRestart {
@Test
public void testAppAttemptTokensRestoredOnRMRestart() throws Exception {
- Logger rootLogger = LogManager.getRootLogger();
- rootLogger.setLevel(Level.DEBUG);
- ExitUtil.disableSystemExit();
-
- YarnConfiguration conf = new YarnConfiguration();
- conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
- conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
@@ -577,7 +572,142 @@ public class TestRMRestart {
rm2.stop();
}
- class TestSecurityMockRM extends MockRM {
+ @Test
+ public void testRMDelegationTokenRestoredOnRMRestart() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+
+ Map<ApplicationId, ApplicationState> rmAppState =
+ rmState.getApplicationState();
+ Map<RMDelegationTokenIdentifier, Long> rmDTState =
+ rmState.getRMDTSecretManagerState().getTokenState();
+ Set<DelegationKey> rmDTMasterKeyState =
+ rmState.getRMDTSecretManagerState().getMasterKeyState();
+
+ MockRM rm1 = new TestSecurityMockRM(conf, memStore);
+ rm1.start();
+
+ // create an empty credential
+ Credentials ts = new Credentials();
+
+ // request a token and add into credential
+ GetDelegationTokenRequest request1 = mock(GetDelegationTokenRequest.class);
+ when(request1.getRenewer()).thenReturn("renewer1");
+ GetDelegationTokenResponse response1 =
+ rm1.getClientRMService().getDelegationToken(request1);
+ DelegationToken delegationToken1 = response1.getRMDelegationToken();
+ Token<RMDelegationTokenIdentifier> token1 =
+ ProtoUtils.convertFromProtoFormat(delegationToken1, null);
+ RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
+
+ HashSet<RMDelegationTokenIdentifier> tokenIdentSet =
+ new HashSet<RMDelegationTokenIdentifier>();
+ ts.addToken(token1.getService(), token1);
+ tokenIdentSet.add(dtId1);
+
+ // submit an app with customized credential
+ RMApp app = rm1.submitApp(200, "name", "user",
+ new HashMap<ApplicationAccessType, String>(), false, "default", 1, ts);
+
+ // assert app info is saved
+ ApplicationState appState = rmAppState.get(app.getApplicationId());
+ Assert.assertNotNull(appState);
+
+ // assert all master keys are saved
+ Set<DelegationKey> allKeysRM1 = rm1.getRMDTSecretManager().getAllMasterKeys();
+ Assert.assertEquals(allKeysRM1, rmDTMasterKeyState);
+
+ // assert all tokens are saved
+ Map<RMDelegationTokenIdentifier, Long> allTokensRM1 =
+ rm1.getRMDTSecretManager().getAllTokens();
+ Assert.assertEquals(tokenIdentSet, allTokensRM1.keySet());
+ Assert.assertEquals(allTokensRM1, rmDTState);
+
+ // assert sequence number is saved
+ Assert.assertEquals(
+ rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
+ rmState.getRMDTSecretManagerState().getDTSequenceNumber());
+
+ // request one more token
+ GetDelegationTokenRequest request2 = mock(GetDelegationTokenRequest.class);
+ when(request2.getRenewer()).thenReturn("renewer2");
+ GetDelegationTokenResponse response2 =
+ rm1.getClientRMService().getDelegationToken(request2);
+ DelegationToken delegationToken2 = response2.getRMDelegationToken();
+ Token<RMDelegationTokenIdentifier> token2 =
+ ProtoUtils.convertFromProtoFormat(delegationToken2, null);
+ RMDelegationTokenIdentifier dtId2 = token2.decodeIdentifier();
+
+ // cancel token2
+ try{
+ rm1.getRMDTSecretManager().cancelToken(token2,
+ UserGroupInformation.getCurrentUser().getUserName());
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // Assert the token which has the latest delegationTokenSequenceNumber is removed
+ Assert.assertEquals(
+ rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
+ dtId2.getSequenceNumber());
+ Assert.assertFalse(rmDTState.containsKey(dtId2));
+
+ // start new RM
+ MockRM rm2 = new TestSecurityMockRM(conf, memStore);
+ rm2.start();
+
+ // assert master keys and tokens are populated back to DTSecretManager
+ Map<RMDelegationTokenIdentifier, Long> allTokensRM2 =
+ rm2.getRMDTSecretManager().getAllTokens();
+ Assert.assertEquals(allTokensRM1, allTokensRM2);
+ // rm2 has its own master keys when it starts, we use containsAll here
+ Assert.assertTrue(rm2.getRMDTSecretManager().getAllMasterKeys()
+ .containsAll(allKeysRM1));
+
+ // assert sequenceNumber is properly recovered,
+ // even though the token which has max sequenceNumber is not stored
+ Assert.assertEquals(rm1.getRMDTSecretManager().getLatestDTSequenceNumber(),
+ rm2.getRMDTSecretManager().getLatestDTSequenceNumber());
+
+ // renewDate before renewing
+ Long renewDateBeforeRenew = allTokensRM2.get(dtId1);
+ try{
+ // renew recovered token
+ rm2.getRMDTSecretManager().renewToken(token1, "renewer1");
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
+ Long renewDateAfterRenew = allTokensRM2.get(dtId1);
+ // assert token is renewed
+ Assert.assertTrue(renewDateAfterRenew > renewDateBeforeRenew);
+
+ // assert new token is added into state store
+ Assert.assertTrue(rmDTState.containsValue(renewDateAfterRenew));
+ // assert old token is removed from state store
+ Assert.assertFalse(rmDTState.containsValue(renewDateBeforeRenew));
+
+ try{
+ rm2.getRMDTSecretManager().cancelToken(token1,
+ UserGroupInformation.getCurrentUser().getUserName());
+ } catch(Exception e) {
+ Assert.fail();
+ }
+
+ // assert token is removed from state after its cancelled
+ allTokensRM2 = rm2.getRMDTSecretManager().getAllTokens();
+ Assert.assertFalse(allTokensRM2.containsKey(dtId1));
+ Assert.assertFalse(rmDTState.containsKey(dtId1));
+
+ // stop the RM
+ rm1.stop();
+ rm2.stop();
+ }
+
+ public static class TestSecurityMockRM extends MockRM {
public TestSecurityMockRM(Configuration conf, RMStateStore store) {
super(conf, store);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java?rev=1487720&r1=1487719&r2=1487720&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java Thu May 30 04:14:26 2013
@@ -31,6 +31,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import junit.framework.Assert;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.HdfsConfig
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -53,8 +56,10 @@ import org.apache.hadoop.yarn.event.Disp
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.ApplicationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -111,7 +116,8 @@ public class TestRMStateStore {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
- testRMStateStore(fsTester);
+ testRMAppStateStore(fsTester);
+ testRMDTSecretManagerStateStore(fsTester);
} finally {
cluster.shutdown();
}
@@ -218,7 +224,7 @@ public class TestRMStateStore {
}
@SuppressWarnings("unchecked")
- void testRMStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
+ void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
long submitTime = System.currentTimeMillis();
Configuration conf = new YarnConfiguration();
RMStateStore store = stateStoreHelper.getRMStateStore();
@@ -334,6 +340,37 @@ public class TestRMStateStore {
store.close();
}
+ public void testRMDTSecretManagerStateStore(
+ RMStateStoreHelper stateStoreHelper) throws Exception {
+ RMStateStore store = stateStoreHelper.getRMStateStore();
+ TestDispatcher dispatcher = new TestDispatcher();
+ store.setDispatcher(dispatcher);
+
+ // store RM delegation token;
+ RMDelegationTokenIdentifier dtId1 =
+ new RMDelegationTokenIdentifier(new Text("owner1"),
+ new Text("renewer1"), new Text("realuser1"));
+ Long renewDate1 = new Long(System.currentTimeMillis());
+ int sequenceNumber = 1111;
+ store.storeRMDelegationTokenAndSequenceNumber(dtId1, renewDate1,
+ sequenceNumber);
+ Map<RMDelegationTokenIdentifier, Long> token1 =
+ new HashMap<RMDelegationTokenIdentifier, Long>();
+ token1.put(dtId1, renewDate1);
+
+ // store delegation key;
+ DelegationKey key = new DelegationKey(1234, 4321 , "keyBytes".getBytes());
+ HashSet<DelegationKey> keySet = new HashSet<DelegationKey>();
+ keySet.add(key);
+ store.storeRMDTMasterKey(key);
+
+ RMDTSecretManagerState secretManagerState =
+ store.loadState().getRMDTSecretManagerState();
+ Assert.assertEquals(token1, secretManagerState.getTokenState());
+ Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
+ Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber());
+ }
+
private List<Token<?>> generateTokens(ApplicationAttemptId attemptId,
ApplicationTokenSecretManager appTokenMgr,
ClientToAMTokenSecretManagerInRM clientTokenMgr, Configuration conf) {
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java?rev=1487720&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestRMDelegationTokens.java Thu May 30 04:14:26 2013
@@ -0,0 +1,206 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.security;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.records.DelegationToken;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRMDelegationTokens {
+
+ private YarnConfiguration conf;
+
+ @Before
+ public void setup() {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ ExitUtil.disableSystemExit();
+ conf = new YarnConfiguration();
+ UserGroupInformation.setConfiguration(conf);
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
+ }
+
+ @Test(timeout = 15000)
+ public void testRMDTMasterKeyStateOnRollingMasterKey() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+
+ Map<RMDelegationTokenIdentifier, Long> rmDTState =
+ rmState.getRMDTSecretManagerState().getTokenState();
+ Set<DelegationKey> rmDTMasterKeyState =
+ rmState.getRMDTSecretManagerState().getMasterKeyState();
+
+ MockRM rm1 = new MyMockRM(conf, memStore);
+ rm1.start();
+ // on rm start, two master keys are created.
+ // One is created at RMDTSecretMgr.startThreads.updateCurrentKey();
+ // the other is created on the first run of
+ // tokenRemoverThread.rollMasterKey()
+
+ RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
+ // assert all master keys are saved
+ Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
+ Set<DelegationKey> expiringKeys = new HashSet<DelegationKey>();
+ expiringKeys.addAll(dtSecretManager.getAllMasterKeys());
+
+ // record the current key
+ DelegationKey oldCurrentKey =
+ ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
+
+ // request to generate a RMDelegationToken
+ GetDelegationTokenRequest request = mock(GetDelegationTokenRequest.class);
+ when(request.getRenewer()).thenReturn("renewer1");
+ GetDelegationTokenResponse response =
+ rm1.getClientRMService().getDelegationToken(request);
+ DelegationToken delegationToken = response.getRMDelegationToken();
+ Token<RMDelegationTokenIdentifier> token1 =
+ ProtoUtils.convertFromProtoFormat(delegationToken, null);
+ RMDelegationTokenIdentifier dtId1 = token1.decodeIdentifier();
+
+ // wait for the first rollMasterKey
+ while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
+ .get() < 1){
+ Thread.sleep(200);
+ }
+
+ // assert old-current-key and new-current-key exist
+ Assert.assertTrue(rmDTMasterKeyState.contains(oldCurrentKey));
+ DelegationKey newCurrentKey =
+ ((TestRMDelegationTokenSecretManager) dtSecretManager).getCurrentKey();
+ Assert.assertTrue(rmDTMasterKeyState.contains(newCurrentKey));
+
+ // wait for token to expire
+ // rollMasterKey is called every 1 second.
+ while (((TestRMDelegationTokenSecretManager) dtSecretManager).numUpdatedKeys
+ .get() < 6) {
+ Thread.sleep(200);
+ }
+
+ Assert.assertFalse(rmDTState.containsKey(dtId1));
+ rm1.stop();
+ }
+
+ @Test(timeout = 15000)
+ public void testRemoveExpiredMasterKeyInRMStateStore() throws Exception {
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+
+ Set<DelegationKey> rmDTMasterKeyState =
+ rmState.getRMDTSecretManagerState().getMasterKeyState();
+
+ MockRM rm1 = new MyMockRM(conf, memStore);
+ rm1.start();
+ RMDelegationTokenSecretManager dtSecretManager = rm1.getRMDTSecretManager();
+
+ // assert all master keys are saved
+ Assert.assertEquals(dtSecretManager.getAllMasterKeys(), rmDTMasterKeyState);
+ Set<DelegationKey> expiringKeys = new HashSet<DelegationKey>();
+ expiringKeys.addAll(dtSecretManager.getAllMasterKeys());
+
+ // wait for expiringKeys to expire
+ while (true) {
+ boolean allExpired = true;
+ for (DelegationKey key : expiringKeys) {
+ if (rmDTMasterKeyState.contains(key)) {
+ allExpired = false;
+ }
+ }
+ if (allExpired)
+ break;
+ Thread.sleep(500);
+ }
+ }
+
+ class MyMockRM extends TestSecurityMockRM {
+
+ public MyMockRM(Configuration conf, RMStateStore store) {
+ super(conf, store);
+ }
+
+ @Override
+ protected RMDelegationTokenSecretManager
+ createRMDelegationTokenSecretManager(RMContext rmContext) {
+ // KeyUpdateInterval-> 1 seconds
+ // TokenMaxLifetime-> 2 seconds.
+ return new TestRMDelegationTokenSecretManager(1000, 1000, 2000, 1000,
+ rmContext);
+ }
+ }
+
+ public class TestRMDelegationTokenSecretManager extends
+ RMDelegationTokenSecretManager {
+ public AtomicInteger numUpdatedKeys = new AtomicInteger(0);
+
+ public TestRMDelegationTokenSecretManager(long delegationKeyUpdateInterval,
+ long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
+ long delegationTokenRemoverScanInterval, RMContext rmContext) {
+ super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
+ delegationTokenRenewInterval, delegationTokenRemoverScanInterval,
+ rmContext);
+ }
+
+ @Override
+ protected void storeNewMasterKey(DelegationKey newKey) {
+ super.storeNewMasterKey(newKey);
+ numUpdatedKeys.incrementAndGet();
+ }
+
+ public DelegationKey getCurrentKey() {
+ for (int keyId : allKeys.keySet()) {
+ if (keyId == currentId) {
+ return allKeys.get(keyId);
+ }
+ }
+ return null;
+ }
+ }
+
+}