You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/01/08 18:47:54 UTC
hadoop git commit: YARN-2996. Improved synchronization and I/O
operations of FS- and Mem- RMStateStore. Contributed by Yi Liu.
Repository: hadoop
Updated Branches:
refs/heads/trunk a6ed4894b -> dc2eaa26b
YARN-2996. Improved synchronization and I/O operations of FS- and Mem- RMStateStore. Contributed by Yi Liu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dc2eaa26
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dc2eaa26
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dc2eaa26
Branch: refs/heads/trunk
Commit: dc2eaa26b20cfbbcdd5784bb8761d08a42f29605
Parents: a6ed489
Author: Zhijie Shen <zj...@apache.org>
Authored: Thu Jan 8 09:47:02 2015 -0800
Committer: Zhijie Shen <zj...@apache.org>
Committed: Thu Jan 8 09:47:02 2015 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +++
.../recovery/FileSystemRMStateStore.java | 26 ++++++++++++--------
.../recovery/MemoryRMStateStore.java | 11 +++++----
3 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc2eaa26/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 3f55118..c7e65f1 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -169,6 +169,9 @@ Release 2.7.0 - UNRELEASED
YARN-2880. Added a test to make sure node labels will be recovered
if RM restart is enabled. (Rohith Sharmaks via jianhe)
+ YARN-2996. Improved synchronization and I/O operations of FS- and Mem-
+ RMStateStore. (Yi Liu via zjshen)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc2eaa26/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 7783662..6e830a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -139,8 +139,8 @@ public class FileSystemRMStateStore extends RMStateStore {
@Override
protected synchronized Version loadVersion() throws Exception {
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
- if (fs.exists(versionNodePath)) {
- FileStatus status = fs.getFileStatus(versionNodePath);
+ FileStatus status = getFileStatus(versionNodePath);
+ if (status != null) {
byte[] data = readFile(versionNodePath, status.getLen());
Version version =
new VersionPBImpl(VersionProto.parseFrom(data));
@@ -165,9 +165,9 @@ public class FileSystemRMStateStore extends RMStateStore {
public synchronized long getAndIncrementEpoch() throws Exception {
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
long currentEpoch = 0;
- if (fs.exists(epochNodePath)) {
+ FileStatus status = getFileStatus(epochNodePath);
+ if (status != null) {
// load current epoch
- FileStatus status = fs.getFileStatus(epochNodePath);
byte[] data = readFile(epochNodePath, status.getLen());
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
currentEpoch = epoch.getEpoch();
@@ -201,13 +201,11 @@ public class FileSystemRMStateStore extends RMStateStore {
checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
Path amrmTokenSecretManagerStateDataDir =
new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
- FileStatus status;
- try {
- status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir);
- assert status.isFile();
- } catch (FileNotFoundException ex) {
+ FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir);
+ if (status == null) {
return;
}
+ assert status.isFile();
byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
AMRMTokenSecretManagerStatePBImpl stateData =
new AMRMTokenSecretManagerStatePBImpl(
@@ -466,7 +464,7 @@ public class FileSystemRMStateStore extends RMStateStore {
}
@Override
- protected void updateRMDelegationTokenState(
+ protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, true);
@@ -560,6 +558,14 @@ public class FileSystemRMStateStore extends RMStateStore {
}
}
+ private FileStatus getFileStatus(Path path) throws Exception {
+ try {
+ return fs.getFileStatus(path);
+ } catch (FileNotFoundException e) {
+ return null;
+ }
+ }
+
/*
* In order to make this write atomic as a part of write we will first write
* data to .tmp file and then rename it. Here we are assuming that rename is
http://git-wip-us.apache.org/repos/asf/hadoop/blob/dc2eaa26/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
index 3646949..8cd776e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/MemoryRMStateStore.java
@@ -91,15 +91,16 @@ public class MemoryRMStateStore extends RMStateStore {
}
@Override
- public void storeApplicationStateInternal(
+ public synchronized void storeApplicationStateInternal(
ApplicationId appId, ApplicationStateData appState)
throws Exception {
state.appState.put(appId, appState);
}
@Override
- public void updateApplicationStateInternal(ApplicationId appId,
- ApplicationStateData appState) throws Exception {
+ public synchronized void updateApplicationStateInternal(
+ ApplicationId appId, ApplicationStateData appState)
+ throws Exception {
LOG.info("Updating final state " + appState.getState() + " for app: "
+ appId);
if (state.appState.get(appId) != null) {
@@ -186,7 +187,7 @@ public class MemoryRMStateStore extends RMStateStore {
}
@Override
- protected void updateRMDelegationTokenState(
+ protected synchronized void updateRMDelegationTokenState(
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
throws Exception {
removeRMDelegationTokenState(rmDTIdentifier);
@@ -237,7 +238,7 @@ public class MemoryRMStateStore extends RMStateStore {
}
@Override
- public void storeOrUpdateAMRMTokenSecretManagerState(
+ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
AMRMTokenSecretManagerState amrmTokenSecretManagerState,
boolean isUpdate) {
if (amrmTokenSecretManagerState != null) {