You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/12/19 03:33:05 UTC
svn commit: r1552209 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-se...
Author: vinodkv
Date: Thu Dec 19 02:33:05 2013
New Revision: 1552209
URL: http://svn.apache.org/r1552209
Log:
YARN-1307. Redesign znode structure for Zookeeper based RM state-store for better organization and scalability. Contributed by Tsuyoshi OZAWA.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1552209&r1=1552208&r2=1552209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Dec 19 02:33:05 2013
@@ -184,6 +184,9 @@ Release 2.4.0 - UNRELEASED
YARN-1446. Changed client API to retry killing application till RM
acknowledges so as to account for RM crashes/failover. (Jian He via vinodkv)
+ YARN-1307. Redesign znode structure for Zookeeper based RM state-store for
+ better organization and scalability. (Tsuyoshi OZAWA via vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java?rev=1552209&r1=1552208&r2=1552209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java Thu Dec 19 02:33:05 2013
@@ -287,11 +287,12 @@ public class FileSystemRMStateStore exte
}
@Override
- public synchronized void storeApplicationStateInternal(String appId,
+ public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
- Path appDirPath = getAppDir(rmAppRoot, appId);
+ String appIdStr = appId.toString();
+ Path appDirPath = getAppDir(rmAppRoot, appIdStr);
fs.mkdirs(appDirPath);
- Path nodeCreatePath = getNodePath(appDirPath, appId);
+ Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -306,10 +307,11 @@ public class FileSystemRMStateStore exte
}
@Override
- public synchronized void updateApplicationStateInternal(String appId,
+ public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
- Path appDirPath = getAppDir(rmAppRoot, appId);
- Path nodeCreatePath = getNodePath(appDirPath, appId);
+ String appIdStr = appId.toString();
+ Path appDirPath = getAppDir(rmAppRoot, appIdStr);
+ Path nodeCreatePath = getNodePath(appDirPath, appIdStr);
LOG.info("Updating info for app: " + appId + " at: " + nodeCreatePath);
byte[] appStateData = appStateDataPB.getProto().toByteArray();
@@ -325,14 +327,13 @@ public class FileSystemRMStateStore exte
@Override
public synchronized void storeApplicationAttemptStateInternal(
- String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptId appAttemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
- ApplicationAttemptId appAttemptId =
- ConverterUtils.toApplicationAttemptId(attemptId);
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
- Path nodeCreatePath = getNodePath(appDirPath, attemptId);
- LOG.info("Storing info for attempt: " + attemptId + " at: "
+ Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+ LOG.info("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
try {
@@ -340,21 +341,20 @@ public class FileSystemRMStateStore exte
// based on whether we have lost the right to write to FS
writeFile(nodeCreatePath, attemptStateData);
} catch (Exception e) {
- LOG.info("Error storing info for attempt: " + attemptId, e);
+ LOG.info("Error storing info for attempt: " + appAttemptId, e);
throw e;
}
}
@Override
public synchronized void updateApplicationAttemptStateInternal(
- String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptId appAttemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
- ApplicationAttemptId appAttemptId =
- ConverterUtils.toApplicationAttemptId(attemptId);
Path appDirPath =
getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
- Path nodeCreatePath = getNodePath(appDirPath, attemptId);
- LOG.info("Updating info for attempt: " + attemptId + " at: "
+ Path nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+ LOG.info("Updating info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
try {
@@ -362,7 +362,7 @@ public class FileSystemRMStateStore exte
// based on whether we have lost the right to write to FS
updateFile(nodeCreatePath, attemptStateData);
} catch (Exception e) {
- LOG.info("Error updating info for attempt: " + attemptId, e);
+ LOG.info("Error updating info for attempt: " + appAttemptId, e);
throw e;
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java?rev=1552209&r1=1552208&r2=1552209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java Thu Dec 19 02:33:05 2013
@@ -80,7 +80,7 @@ public class MemoryRMStateStore extends
}
@Override
- public void storeApplicationStateInternal(String appId,
+ public void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData)
throws Exception {
ApplicationState appState =
@@ -88,11 +88,11 @@ public class MemoryRMStateStore extends
appStateData.getStartTime(),
appStateData.getApplicationSubmissionContext(),
appStateData.getUser());
- state.appState.put(appState.getAppId(), appState);
+ state.appState.put(appId, appState);
}
@Override
- public void updateApplicationStateInternal(String appId,
+ public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
ApplicationState updatedAppState =
new ApplicationState(appStateData.getSubmitTime(),
@@ -102,21 +102,19 @@ public class MemoryRMStateStore extends
appStateData.getDiagnostics(), appStateData.getFinishTime());
LOG.info("Updating final state " + appStateData.getState() + " for app: "
+ appId);
- ApplicationId applicationId = updatedAppState.getAppId();
- if (state.appState.get(applicationId) != null) {
+ if (state.appState.get(appId) != null) {
// add the earlier attempts back
updatedAppState.attempts
- .putAll(state.appState.get(applicationId).attempts);
+ .putAll(state.appState.get(appId).attempts);
}
- state.appState.put(applicationId, updatedAppState);
+ state.appState.put(appId, updatedAppState);
}
@Override
- public synchronized void storeApplicationAttemptStateInternal(String attemptIdStr,
- ApplicationAttemptStateDataPBImpl attemptStateData)
- throws Exception {
- ApplicationAttemptId attemptId = ConverterUtils
- .toApplicationAttemptId(attemptIdStr);
+ public synchronized void storeApplicationAttemptStateInternal(
+ ApplicationAttemptId appAttemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateData)
+ throws Exception {
Credentials credentials = null;
if(attemptStateData.getAppAttemptTokens() != null){
DataInputByteBuffer dibb = new DataInputByteBuffer();
@@ -125,7 +123,7 @@ public class MemoryRMStateStore extends
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState attemptState =
- new ApplicationAttemptState(attemptId,
+ new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime());
@@ -139,10 +137,9 @@ public class MemoryRMStateStore extends
@Override
public synchronized void updateApplicationAttemptStateInternal(
- String attemptIdStr, ApplicationAttemptStateDataPBImpl attemptStateData)
+ ApplicationAttemptId appAttemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception {
- ApplicationAttemptId attemptId =
- ConverterUtils.toApplicationAttemptId(attemptIdStr);
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
DataInputByteBuffer dibb = new DataInputByteBuffer();
@@ -151,7 +148,7 @@ public class MemoryRMStateStore extends
credentials.readTokenStorageStream(dibb);
}
ApplicationAttemptState updatedAttemptState =
- new ApplicationAttemptState(attemptId,
+ new ApplicationAttemptState(appAttemptId,
attemptStateData.getMasterContainer(), credentials,
attemptStateData.getStartTime(), attemptStateData.getState(),
attemptStateData.getFinalTrackingUrl(),
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java?rev=1552209&r1=1552208&r2=1552209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/NullRMStateStore.java Thu Dec 19 02:33:05 2013
@@ -22,6 +22,8 @@ package org.apache.hadoop.yarn.server.re
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
@@ -51,13 +53,13 @@ public class NullRMStateStore extends RM
}
@Override
- protected void storeApplicationStateInternal(String appId,
+ protected void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
// Do nothing
}
@Override
- protected void storeApplicationAttemptStateInternal(String attemptId,
+ protected void storeApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// Do nothing
}
@@ -92,13 +94,13 @@ public class NullRMStateStore extends RM
}
@Override
- protected void updateApplicationStateInternal(String appId,
+ protected void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
// Do nothing
}
@Override
- protected void updateApplicationAttemptStateInternal(String attemptId,
+ protected void updateApplicationAttemptStateInternal(ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java?rev=1552209&r1=1552208&r2=1552209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java Thu Dec 19 02:33:05 2013
@@ -387,10 +387,10 @@ public abstract class RMStateStore exten
* Derived classes must implement this method to store the state of an
* application.
*/
- protected abstract void storeApplicationStateInternal(String appId,
+ protected abstract void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception;
- protected abstract void updateApplicationStateInternal(String appId,
+ protected abstract void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception;
@SuppressWarnings("unchecked")
@@ -424,10 +424,12 @@ public abstract class RMStateStore exten
* Derived classes must implement this method to store the state of an
* application attempt
*/
- protected abstract void storeApplicationAttemptStateInternal(String attemptId,
+ protected abstract void storeApplicationAttemptStateInternal(
+ ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
- protected abstract void updateApplicationAttemptStateInternal(String attemptId,
+ protected abstract void updateApplicationAttemptStateInternal(
+ ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception;
/**
@@ -592,11 +594,11 @@ public abstract class RMStateStore exten
LOG.info("Storing info for app: " + appId);
try {
if (event.getType().equals(RMStateStoreEventType.STORE_APP)) {
- storeApplicationStateInternal(appId.toString(), appStateData);
+ storeApplicationStateInternal(appId, appStateData);
notifyDoneStoringApplication(appId, storedException);
} else {
assert event.getType().equals(RMStateStoreEventType.UPDATE_APP);
- updateApplicationStateInternal(appId.toString(), appStateData);
+ updateApplicationStateInternal(appId, appStateData);
notifyDoneUpdatingApplication(appId, storedException);
}
} catch (Exception e) {
@@ -637,15 +639,15 @@ public abstract class RMStateStore exten
LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
}
if (event.getType().equals(RMStateStoreEventType.STORE_APP_ATTEMPT)) {
- storeApplicationAttemptStateInternal(attemptState.getAttemptId()
- .toString(), attemptStateData);
+ storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
+ attemptStateData);
notifyDoneStoringApplicationAttempt(attemptState.getAttemptId(),
storedException);
} else {
assert event.getType().equals(
RMStateStoreEventType.UPDATE_APP_ATTEMPT);
- updateApplicationAttemptStateInternal(attemptState.getAttemptId()
- .toString(), attemptStateData);
+ updateApplicationAttemptStateInternal(attemptState.getAttemptId(),
+ attemptStateData);
notifyDoneUpdatingApplicationAttempt(attemptState.getAttemptId(),
storedException);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java?rev=1552209&r1=1552208&r2=1552209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java Thu Dec 19 02:33:05 2013
@@ -78,16 +78,51 @@ public class ZKRMStateStore extends RMSt
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
.newInstance(1, 0);
+ private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
+ "RMDelegationTokensRoot";
+ private static final String RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME =
+ "RMDTSequentialNumber";
+ private static final String RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME =
+ "RMDTMasterKeysRoot";
private int numRetries;
private String zkHostPort = null;
private int zkSessionTimeout;
private long zkRetryInterval;
private List<ACL> zkAcl;
+
+ /**
+ *
+ * ROOT_DIR_PATH
+ * |--- VERSION_INFO
+ * |--- RM_ZK_FENCING_LOCK
+ * |--- RM_APP_ROOT
+ * | |----- (#ApplicationId1)
+ * | | |----- (#ApplicationAttemptIds)
+ * | |
+ * | |----- (#ApplicationId2)
+ * | | |----- (#ApplicationAttemptIds)
+ * | ....
+ * |
+ * |--- RM_DT_SECRET_MANAGER_ROOT
+ * |----- RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME
+ * |----- RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME
+ * | |----- Token_1
+ * | |----- Token_2
+ * | ....
+ * |
+ * |----- RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME
+ * | |----- Key_1
+ * | |----- Key_2
+ * ....
+ *
+ */
private String zkRootNodePath;
- private String rmDTSecretManagerRoot;
private String rmAppRoot;
- private String dtSequenceNumberPath = null;
+ private String rmDTSecretManagerRoot;
+ private String dtMasterKeysRootPath;
+ private String delegationTokensRootPath;
+ private String dtSequenceNumberPath;
@VisibleForTesting
protected String znodeWorkingPath;
@@ -178,12 +213,11 @@ public class ZKRMStateStore extends RMSt
throw bafe;
}
- zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
- rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
- rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
+ zkRootNodePath = getNodePath(znodeWorkingPath, ROOT_ZNODE_NAME);
+ rmAppRoot = getNodePath(zkRootNodePath, RM_APP_ROOT);
/* Initialize fencing related paths, acls, and ops */
- fencingNodePath = zkRootNodePath + "/" + FENCING_LOCK;
+ fencingNodePath = getNodePath(zkRootNodePath, FENCING_LOCK);
createFencingNodePathOp = Op.create(fencingNodePath, new byte[0], zkAcl,
CreateMode.PERSISTENT);
deleteFencingNodePathOp = Op.delete(fencingNodePath, -1);
@@ -204,6 +238,15 @@ public class ZKRMStateStore extends RMSt
zkRootNodeAcl = constructZkRootNodeACL(conf, zkAcl);
}
}
+
+ rmDTSecretManagerRoot =
+ getNodePath(zkRootNodePath, RM_DT_SECRET_MANAGER_ROOT);
+ dtMasterKeysRootPath = getNodePath(rmDTSecretManagerRoot,
+ RM_DT_MASTER_KEYS_ROOT_ZNODE_NAME);
+ delegationTokensRootPath = getNodePath(rmDTSecretManagerRoot,
+ RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME);
+ dtSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
+ RM_DT_SEQUENTIAL_NUMBER_ZNODE_NAME);
}
@Override
@@ -217,8 +260,11 @@ public class ZKRMStateStore extends RMSt
if (HAUtil.isHAEnabled(getConfig())){
fence();
}
- createRootDir(rmDTSecretManagerRoot);
createRootDir(rmAppRoot);
+ createRootDir(rmDTSecretManagerRoot);
+ createRootDir(dtMasterKeysRootPath);
+ createRootDir(delegationTokensRootPath);
+ createRootDir(dtSequenceNumberPath);
}
private void createRootDir(final String rootPath) throws Exception {
@@ -350,26 +396,69 @@ public class ZKRMStateStore extends RMSt
private synchronized void loadRMDTSecretManagerState(RMState rmState)
throws Exception {
- List<String> childNodes =
- getChildrenWithRetries(rmDTSecretManagerRoot, true);
+ loadRMDelegationKeyState(rmState);
+ loadRMSequentialNumberState(rmState);
+ loadRMDelegationTokenState(rmState);
+ }
+ private void loadRMDelegationKeyState(RMState rmState) throws Exception {
+ List<String> childNodes =
+ getChildrenWithRetries(dtMasterKeysRootPath, true);
for (String childNodeName : childNodes) {
- if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
- rmState.rmSecretManagerState.dtSequenceNumber =
- Integer.parseInt(childNodeName.split("_")[1]);
+ String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
+ byte[] childData = getDataWithRetries(childNodePath, true);
+
+ if (childData == null) {
+ LOG.warn("Content of " + childNodePath + " is broken.");
continue;
}
- String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
- byte[] childData = getDataWithRetries(childNodePath, true);
ByteArrayInputStream is = new ByteArrayInputStream(childData);
DataInputStream fsIn = new DataInputStream(is);
+
try {
if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
DelegationKey key = new DelegationKey();
key.readFields(fsIn);
rmState.rmSecretManagerState.masterKeyState.add(key);
- } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+ }
+ } finally {
+ is.close();
+ }
+ }
+ }
+
+ private void loadRMSequentialNumberState(RMState rmState) throws Exception {
+ byte[] seqData = getDataWithRetries(dtSequenceNumberPath, false);
+ if (seqData != null) {
+ ByteArrayInputStream seqIs = new ByteArrayInputStream(seqData);
+ DataInputStream seqIn = new DataInputStream(seqIs);
+
+ try {
+ rmState.rmSecretManagerState.dtSequenceNumber = seqIn.readInt();
+ } finally {
+ seqIn.close();
+ }
+ }
+ }
+
+ private void loadRMDelegationTokenState(RMState rmState) throws Exception {
+ List<String> childNodes = zkClient.getChildren(delegationTokensRootPath, true);
+ for (String childNodeName : childNodes) {
+ String childNodePath =
+ getNodePath(delegationTokensRootPath, childNodeName);
+ byte[] childData = getDataWithRetries(childNodePath, true);
+
+ if (childData == null) {
+ LOG.warn("Content of " + childNodePath + " is broken.");
+ continue;
+ }
+
+ ByteArrayInputStream is = new ByteArrayInputStream(childData);
+ DataInputStream fsIn = new DataInputStream(is);
+
+ try {
+ if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
RMDelegationTokenIdentifier identifier =
new RMDelegationTokenIdentifier();
identifier.readFields(fsIn);
@@ -385,8 +474,6 @@ public class ZKRMStateStore extends RMSt
private synchronized void loadRMAppState(RMState rmState) throws Exception {
List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
- List<ApplicationAttemptState> attempts =
- new ArrayList<ApplicationAttemptState>();
for (String childNodeName : childNodes) {
String childNodePath = getNodePath(rmAppRoot, childNodeName);
byte[] childData = getDataWithRetries(childNodePath, true);
@@ -411,17 +498,28 @@ public class ZKRMStateStore extends RMSt
"from the application id");
}
rmState.appState.put(appId, appState);
- } else if (childNodeName
- .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
- // attempt
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading application attempt from znode: " + childNodeName);
- }
+ loadApplicationAttemptState(appState, appId);
+ } else {
+ LOG.info("Unknown child node with name: " + childNodeName);
+ }
+ }
+ }
+
+ private void loadApplicationAttemptState(ApplicationState appState,
+ ApplicationId appId)
+ throws Exception {
+ String appPath = getNodePath(rmAppRoot, appId.toString());
+ List<String> attempts = getChildrenWithRetries(appPath, false);
+ for (String attemptIDStr : attempts) {
+ if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+ String attemptPath = getNodePath(appPath, attemptIDStr);
+ byte[] attemptData = getDataWithRetries(attemptPath, true);
+
ApplicationAttemptId attemptId =
- ConverterUtils.toApplicationAttemptId(childNodeName);
+ ConverterUtils.toApplicationAttemptId(attemptIDStr);
ApplicationAttemptStateDataPBImpl attemptStateData =
new ApplicationAttemptStateDataPBImpl(
- ApplicationAttemptStateDataProto.parseFrom(childData));
+ ApplicationAttemptStateDataProto.parseFrom(attemptData));
Credentials credentials = null;
if (attemptStateData.getAppAttemptTokens() != null) {
credentials = new Credentials();
@@ -429,47 +527,26 @@ public class ZKRMStateStore extends RMSt
dibb.reset(attemptStateData.getAppAttemptTokens());
credentials.readTokenStorageStream(dibb);
}
+
ApplicationAttemptState attemptState =
new ApplicationAttemptState(attemptId,
- attemptStateData.getMasterContainer(), credentials,
- attemptStateData.getStartTime(),
- attemptStateData.getState(),
- attemptStateData.getFinalTrackingUrl(),
- attemptStateData.getDiagnostics(),
- attemptStateData.getFinalApplicationStatus());
- if (!attemptId.equals(attemptState.getAttemptId())) {
- throw new YarnRuntimeException("The child node name is different " +
- "from the application attempt id");
- }
- attempts.add(attemptState);
- } else {
- LOG.info("Unknown child node with name: " + childNodeName);
- }
- }
+ attemptStateData.getMasterContainer(), credentials,
+ attemptStateData.getStartTime(),
+ attemptStateData.getState(),
+ attemptStateData.getFinalTrackingUrl(),
+ attemptStateData.getDiagnostics(),
+ attemptStateData.getFinalApplicationStatus());
- // go through all attempts and add them to their apps
- for (ApplicationAttemptState attemptState : attempts) {
- ApplicationId appId = attemptState.getAttemptId().getApplicationId();
- ApplicationState appState = rmState.appState.get(appId);
- if (appState != null) {
appState.attempts.put(attemptState.getAttemptId(), attemptState);
- } else {
- // the application znode may have been removed when the application
- // completed but the RM might have stopped before it could remove the
- // application attempt znodes
- LOG.info("Application node not found for attempt: "
- + attemptState.getAttemptId());
- deleteWithRetries(
- getNodePath(rmAppRoot, attemptState.getAttemptId().toString()), -1);
}
}
LOG.info("Done Loading applications from ZK state store");
}
@Override
- public synchronized void storeApplicationStateInternal(String appId,
+ public synchronized void storeApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
- String nodeCreatePath = getNodePath(rmAppRoot, appId);
+ String nodeCreatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -481,25 +558,29 @@ public class ZKRMStateStore extends RMSt
}
@Override
- public synchronized void updateApplicationStateInternal(String appId,
+ public synchronized void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
- String nodeCreatePath = getNodePath(rmAppRoot, appId);
+ String nodeUpdatePath = getNodePath(rmAppRoot, appId.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("Storing final state info for app: " + appId + " at: "
- + nodeCreatePath);
+ + nodeUpdatePath);
}
byte[] appStateData = appStateDataPB.getProto().toByteArray();
- setDataWithRetries(nodeCreatePath, appStateData, 0);
+ setDataWithRetries(nodeUpdatePath, appStateData, 0);
}
@Override
public synchronized void storeApplicationAttemptStateInternal(
- String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptId appAttemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
- String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+ String appDirPath = getNodePath(rmAppRoot,
+ appAttemptId.getApplicationId().toString());
+ String nodeCreatePath = getNodePath(appDirPath, appAttemptId.toString());
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Storing info for attempt: " + attemptId + " at: "
+ LOG.debug("Storing info for attempt: " + appAttemptId + " at: "
+ nodeCreatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
@@ -509,31 +590,36 @@ public class ZKRMStateStore extends RMSt
@Override
public synchronized void updateApplicationAttemptStateInternal(
- String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+ ApplicationAttemptId appAttemptId,
+ ApplicationAttemptStateDataPBImpl attemptStateDataPB)
throws Exception {
- String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+ String appIdStr = appAttemptId.getApplicationId().toString();
+ String appAttemptIdStr = appAttemptId.toString();
+ String appDirPath = getNodePath(rmAppRoot, appIdStr);
+ String nodeUpdatePath = getNodePath(appDirPath, appAttemptIdStr);
if (LOG.isDebugEnabled()) {
- LOG.debug("Storing final state info for attempt: " + attemptId + " at: "
- + nodeCreatePath);
+ LOG.debug("Storing final state info for attempt: " + appAttemptIdStr
+ + " at: " + nodeUpdatePath);
}
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
- setDataWithRetries(nodeCreatePath, attemptStateData, 0);
+ setDataWithRetries(nodeUpdatePath, attemptStateData, 0);
}
@Override
public synchronized void removeApplicationStateInternal(ApplicationState appState)
throws Exception {
String appId = appState.getAppId().toString();
- String nodeRemovePath = getNodePath(rmAppRoot, appId);
+ String appIdRemovePath = getNodePath(rmAppRoot, appId);
ArrayList<Op> opList = new ArrayList<Op>();
- opList.add(Op.delete(nodeRemovePath, -1));
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
- String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
+ String attemptRemovePath = getNodePath(appIdRemovePath, attemptId.toString());
opList.add(Op.delete(attemptRemovePath, -1));
}
+ opList.add(Op.delete(appIdRemovePath, -1));
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
+ LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
+ " and its attempts.");
}
doMultiWithRetries(opList);
@@ -546,38 +632,37 @@ public class ZKRMStateStore extends RMSt
ArrayList<Op> opList = new ArrayList<Op>();
// store RM delegation token
String nodeCreatePath =
- getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+ getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- DataOutputStream fsOut = new DataOutputStream(os);
+ ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
+ DataOutputStream tokenOut = new DataOutputStream(tokenOs);
+ ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+ DataOutputStream seqOut = new DataOutputStream(seqOs);
+
try {
- rmDTIdentifier.write(fsOut);
- fsOut.writeLong(renewDate);
+ rmDTIdentifier.write(tokenOut);
+ tokenOut.writeLong(renewDate);
if (LOG.isDebugEnabled()) {
LOG.debug("Storing RMDelegationToken_" +
rmDTIdentifier.getSequenceNumber());
}
- opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
+
+ opList.add(Op.create(nodeCreatePath, tokenOs.toByteArray(), zkAcl,
CreateMode.PERSISTENT));
- } finally {
- os.close();
- }
- // store sequence number
- String latestSequenceNumberPath =
- getNodePath(rmDTSecretManagerRoot,
- DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
- latestSequenceNumber);
- }
- if (dtSequenceNumberPath != null) {
- opList.add(Op.delete(dtSequenceNumberPath, -1));
+ seqOut.writeInt(latestSequenceNumber);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing " + dtSequenceNumberPath +
+ ". SequenceNumber: " + latestSequenceNumber);
+ }
+
+ opList.add(Op.setData(dtSequenceNumberPath, seqOs.toByteArray(), -1));
+ } finally {
+ tokenOs.close();
+ seqOs.close();
}
- opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
- CreateMode.PERSISTENT));
- dtSequenceNumberPath = latestSequenceNumberPath;
+
doMultiWithRetries(opList);
}
@@ -585,7 +670,7 @@ public class ZKRMStateStore extends RMSt
protected synchronized void removeRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
String nodeRemovePath =
- getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+ getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
+ rmDTIdentifier.getSequenceNumber());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationToken_"
@@ -598,7 +683,7 @@ public class ZKRMStateStore extends RMSt
protected synchronized void storeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeCreatePath =
- getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+ getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream fsOut = new DataOutputStream(os);
@@ -618,7 +703,7 @@ public class ZKRMStateStore extends RMSt
protected synchronized void removeRMDTMasterKeyState(
DelegationKey delegationKey) throws Exception {
String nodeRemovePath =
- getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+ getNodePath(dtMasterKeysRootPath, DELEGATION_KEY_PREFIX
+ delegationKey.getKeyId());
if (LOG.isDebugEnabled()) {
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
@@ -757,8 +842,7 @@ public class ZKRMStateStore extends RMSt
return new ZKAction<byte[]>() {
@Override
public byte[] run() throws KeeperException, InterruptedException {
- Stat stat = new Stat();
- return zkClient.getData(path, watch, stat);
+ return zkClient.getData(path, watch, null);
}
}.runWithRetries();
}
@@ -865,4 +949,5 @@ public class ZKRMStateStore extends RMSt
zk.register(new ForwardingWatcher());
return zk;
}
+
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1552209&r1=1552208&r2=1552209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Thu Dec 19 02:33:05 2013
@@ -683,14 +683,14 @@ public class TestRMRestart {
MemoryRMStateStore memStore = new MemoryRMStateStore() {
@Override
public synchronized void storeApplicationAttemptStateInternal(
- String attemptIdStr,
+ ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// ignore attempt saving request.
}
@Override
public synchronized void updateApplicationAttemptStateInternal(
- String attemptIdStr,
+ ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData) throws Exception {
// ignore attempt saving request.
}
@@ -1540,7 +1540,7 @@ public class TestRMRestart {
public int updateAttempt = 0;
@Override
- public void updateApplicationStateInternal(String appId,
+ public void updateApplicationStateInternal(ApplicationId appId,
ApplicationStateDataPBImpl appStateData) throws Exception {
updateApp = ++count;
super.updateApplicationStateInternal(appId, appStateData);
@@ -1548,11 +1548,12 @@ public class TestRMRestart {
@Override
public synchronized void
- updateApplicationAttemptStateInternal(String attemptIdStr,
+ updateApplicationAttemptStateInternal(
+ ApplicationAttemptId attemptId,
ApplicationAttemptStateDataPBImpl attemptStateData)
throws Exception {
updateAttempt = ++count;
- super.updateApplicationAttemptStateInternal(attemptIdStr,
+ super.updateApplicationAttemptStateInternal(attemptId,
attemptStateData);
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java?rev=1552209&r1=1552208&r2=1552209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java Thu Dec 19 02:33:05 2013
@@ -234,6 +234,12 @@ public class RMStateStoreTestBase extend
attempts.put(attemptIdRemoved, mockRemovedAttempt);
store.removeApplication(mockRemovedApp);
+ // remove application directory recursively.
+ storeApp(store, appIdRemoved, submitTime, startTime);
+ storeAttempt(store, attemptIdRemoved,
+ "container_1352994193343_0002_01_000001", null, null, dispatcher);
+ store.removeApplication(mockRemovedApp);
+
// let things settle down
Thread.sleep(1000);
store.close();
@@ -373,7 +379,30 @@ public class RMStateStoreTestBase extend
Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
Assert.assertEquals(sequenceNumber,
secretManagerState.getDTSequenceNumber());
+
+ // check to delete delegationKey
+ store.removeRMDTMasterKey(key);
+ keySet.clear();
+ RMDTSecretManagerState noKeySecretManagerState =
+ store.loadState().getRMDTSecretManagerState();
+ Assert.assertEquals(token1, noKeySecretManagerState.getTokenState());
+ Assert.assertEquals(keySet, noKeySecretManagerState.getMasterKeyState());
+ Assert.assertEquals(sequenceNumber,
+ noKeySecretManagerState.getDTSequenceNumber());
+
+ // check to delete delegationToken
+ store.removeRMDelegationToken(dtId1, sequenceNumber);
+ RMDTSecretManagerState noKeyAndTokenSecretManagerState =
+ store.loadState().getRMDTSecretManagerState();
+ token1.clear();
+ Assert.assertEquals(token1,
+ noKeyAndTokenSecretManagerState.getTokenState());
+ Assert.assertEquals(keySet,
+ noKeyAndTokenSecretManagerState.getMasterKeyState());
+ Assert.assertEquals(sequenceNumber,
+ noKeySecretManagerState.getDTSequenceNumber());
store.close();
+
}
private Token<AMRMTokenIdentifier> generateAMRMToken(
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java?rev=1552209&r1=1552208&r2=1552209&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java Thu Dec 19 02:33:05 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@@ -178,10 +179,11 @@ public class TestFSRMStateStore extends
@Override
public void run() {
try {
- store.storeApplicationStateInternal("application1",
- (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
- .newApplicationStateData(111, 111, "user", null,
- RMAppState.ACCEPTED, "diagnostics", 333));
+ store.storeApplicationStateInternal(
+ ApplicationId.newInstance(100L, 1),
+ (ApplicationStateDataPBImpl) ApplicationStateDataPBImpl
+ .newApplicationStateData(111, 111, "user", null,
+ RMAppState.ACCEPTED, "diagnostics", 333));
} catch (Exception e) {
// TODO 0 datanode exception will not be retried by dfs client, fix
// that separately.