You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/12/02 01:45:33 UTC
hadoop git commit: YARN-2765. Added leveldb-based implementation for
RMStateStore. Contributed by Jason Lowe (cherry picked from commit
a7fba0bc28764e0fb36c335ea60cc58079fe007f)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 b6d7b789b -> d208c9014
YARN-2765. Added leveldb-based implementation for RMStateStore. Contributed by Jason Lowe
(cherry picked from commit a7fba0bc28764e0fb36c335ea60cc58079fe007f)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d208c901
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d208c901
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d208c901
Branch: refs/heads/branch-2
Commit: d208c90148ca0231e662ff3d8ff6c0f57acef6a7
Parents: b6d7b78
Author: Jian He <ji...@apache.org>
Authored: Mon Dec 1 16:38:25 2014 -0800
Committer: Jian He <ji...@apache.org>
Committed: Mon Dec 1 16:45:17 2014 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../hadoop/yarn/conf/YarnConfiguration.java | 3 +
.../src/main/resources/yarn-default.xml | 8 +
.../hadoop-yarn-server-resourcemanager/pom.xml | 4 +
.../recovery/LeveldbRMStateStore.java | 691 +++++++++++++++++++
.../recovery/RMStateStoreTestBase.java | 2 +
.../recovery/TestLeveldbRMStateStore.java | 139 ++++
7 files changed, 849 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d208c901/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a570a0f..803a8db 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -24,6 +24,8 @@ Release 2.7.0 - UNRELEASED
YARN-2188. [YARN-1492] Client service for cache manager.
(Chris Trezzo and Sangjin Lee via kasha)
+ YARN-2765. Added leveldb-based implementation for RMStateStore. (Jason Lowe
+ via jianhe)
IMPROVEMENTS
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d208c901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d4d2fa9..61e2d0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -506,6 +506,9 @@ public class YarnConfiguration extends Configuration {
public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
"2000, 500";
+ public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
+ + "leveldb-state-store.path";
+
/** The maximum number of completed applications RM keeps. */
public static final String RM_MAX_COMPLETED_APPLICATIONS =
RM_PREFIX + "max-completed-applications";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d208c901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 193b00c..f78c3c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -420,6 +420,14 @@
</property>
<property>
+ <description>Local path where the RM state will be stored when using
+ org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore
+ as the value for yarn.resourcemanager.store.class</description>
+ <name>yarn.resourcemanager.leveldb-state-store.path</name>
+ <value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
+ </property>
+
+ <property>
<description>Enable RM high-availability. When enabled,
(1) The RM starts in the Standby mode by default, and transitions to
the Active mode when prompted to.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d208c901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 5d026d6..30695ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -179,6 +179,10 @@
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<type>test-jar</type>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d208c901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
new file mode 100644
index 0000000..38ce370
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -0,0 +1,691 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Logger;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class LeveldbRMStateStore extends RMStateStore {
+
+ public static final Log LOG =
+ LogFactory.getLog(LeveldbRMStateStore.class);
+
+ private static final String SEPARATOR = "/";
+ private static final String DB_NAME = "yarn-rm-state";
+ private static final String RM_DT_MASTER_KEY_KEY_PREFIX =
+ RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX;
+ private static final String RM_DT_TOKEN_KEY_PREFIX =
+ RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_TOKEN_PREFIX;
+ private static final String RM_DT_SEQUENCE_NUMBER_KEY =
+ RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber";
+ private static final String RM_APP_KEY_PREFIX =
+ RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix;
+
+ private static final Version CURRENT_VERSION_INFO = Version
+ .newInstance(1, 0);
+
+ private DB db;
+
+ private String getApplicationNodeKey(ApplicationId appId) {
+ return RM_APP_ROOT + SEPARATOR + appId;
+ }
+
+ private String getApplicationAttemptNodeKey(ApplicationAttemptId attemptId) {
+ return getApplicationAttemptNodeKey(
+ getApplicationNodeKey(attemptId.getApplicationId()), attemptId);
+ }
+
+ private String getApplicationAttemptNodeKey(String appNodeKey,
+ ApplicationAttemptId attemptId) {
+ return appNodeKey + SEPARATOR + attemptId;
+ }
+
+ private String getRMDTMasterKeyNodeKey(DelegationKey masterKey) {
+ return RM_DT_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId();
+ }
+
+ private String getRMDTTokenNodeKey(RMDelegationTokenIdentifier tokenId) {
+ return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber();
+ }
+
+ @Override
+ protected void initInternal(Configuration conf) throws Exception {
+ }
+
+ private Path getStorageDir() throws IOException {
+ Configuration conf = getConfig();
+ String storePath = conf.get(YarnConfiguration.RM_LEVELDB_STORE_PATH);
+ if (storePath == null) {
+ throw new IOException("No store location directory configured in " +
+ YarnConfiguration.RM_LEVELDB_STORE_PATH);
+ }
+ return new Path(storePath, DB_NAME);
+ }
+
+ private Path createStorageDir() throws IOException {
+ Path root = getStorageDir();
+ FileSystem fs = FileSystem.getLocal(getConfig());
+ fs.mkdirs(root, new FsPermission((short)0700));
+ return root;
+ }
+
+ @Override
+ protected void startInternal() throws Exception {
+ Path storeRoot = createStorageDir();
+ Options options = new Options();
+ options.createIfMissing(false);
+ options.logger(new LeveldbLogger());
+ LOG.info("Using state database at " + storeRoot + " for recovery");
+ File dbfile = new File(storeRoot.toString());
+ try {
+ db = JniDBFactory.factory.open(dbfile, options);
+ } catch (NativeDB.DBException e) {
+ if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+ LOG.info("Creating state database at " + dbfile);
+ options.createIfMissing(true);
+ try {
+ db = JniDBFactory.factory.open(dbfile, options);
+ // store version
+ storeVersion();
+ } catch (DBException dbErr) {
+ throw new IOException(dbErr.getMessage(), dbErr);
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ protected void closeInternal() throws Exception {
+ if (db != null) {
+ db.close();
+ db = null;
+ }
+ }
+
+ @VisibleForTesting
+ boolean isClosed() {
+ return db == null;
+ }
+
+ @Override
+ protected Version loadVersion() throws Exception {
+ Version version = null;
+ try {
+ byte[] data = db.get(bytes(VERSION_NODE));
+ if (data != null) {
+ version = new VersionPBImpl(VersionProto.parseFrom(data));
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ return version;
+ }
+
+ @Override
+ protected void storeVersion() throws Exception {
+ dbStoreVersion(CURRENT_VERSION_INFO);
+ }
+
+ void dbStoreVersion(Version state) throws IOException {
+ String key = VERSION_NODE;
+ byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
+ try {
+ db.put(bytes(key), data);
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected Version getCurrentVersion() {
+ return CURRENT_VERSION_INFO;
+ }
+
+ @Override
+ public synchronized long getAndIncrementEpoch() throws Exception {
+ long currentEpoch = 0;
+ byte[] dbKeyBytes = bytes(EPOCH_NODE);
+ try {
+ byte[] data = db.get(dbKeyBytes);
+ if (data != null) {
+ currentEpoch = EpochProto.parseFrom(data).getEpoch();
+ }
+ EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto();
+ db.put(dbKeyBytes, proto.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ return currentEpoch;
+ }
+
+ @Override
+ public RMState loadState() throws Exception {
+ RMState rmState = new RMState();
+ loadRMDTSecretManagerState(rmState);
+ loadRMApps(rmState);
+ loadAMRMTokenSecretManagerState(rmState);
+ return rmState;
+ }
+
+ private void loadRMDTSecretManagerState(RMState state) throws IOException {
+ int numKeys = loadRMDTSecretManagerKeys(state);
+ LOG.info("Recovered " + numKeys + " RM delegation token master keys");
+ int numTokens = loadRMDTSecretManagerTokens(state);
+ LOG.info("Recovered " + numTokens + " RM delegation tokens");
+ loadRMDTSecretManagerTokenSequenceNumber(state);
+ }
+
+ private int loadRMDTSecretManagerKeys(RMState state) throws IOException {
+ int numKeys = 0;
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX));
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(RM_DT_MASTER_KEY_KEY_PREFIX)) {
+ break;
+ }
+ DelegationKey masterKey = loadDelegationKey(entry.getValue());
+ state.rmSecretManagerState.masterKeyState.add(masterKey);
+ ++numKeys;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded RM delegation key from " + key
+ + ": keyId=" + masterKey.getKeyId()
+ + ", expirationDate=" + masterKey.getExpiryDate());
+ }
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ return numKeys;
+ }
+
+ private DelegationKey loadDelegationKey(byte[] data) throws IOException {
+ DelegationKey key = new DelegationKey();
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+ try {
+ key.readFields(in);
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ }
+ return key;
+ }
+
+ private int loadRMDTSecretManagerTokens(RMState state) throws IOException {
+ int numTokens = 0;
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX));
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(RM_DT_TOKEN_KEY_PREFIX)) {
+ break;
+ }
+ RMDelegationTokenIdentifierData tokenData = loadDelegationToken(
+ entry.getValue());
+ RMDelegationTokenIdentifier tokenId = tokenData.getTokenIdentifier();
+ long renewDate = tokenData.getRenewDate();
+ state.rmSecretManagerState.delegationTokenState.put(tokenId,
+ renewDate);
+ ++numTokens;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded RM delegation token from " + key
+ + ": tokenId=" + tokenId + ", renewDate=" + renewDate);
+ }
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ return numTokens;
+ }
+
+ private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
+ throws IOException {
+ RMDelegationTokenIdentifierData tokenData =
+ new RMDelegationTokenIdentifierData();
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+ try {
+ tokenData.readFields(in);
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ }
+ return tokenData;
+ }
+
+ private void loadRMDTSecretManagerTokenSequenceNumber(RMState state)
+ throws IOException {
+ byte[] data = null;
+ try {
+ data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ if (data != null) {
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+ try {
+ state.rmSecretManagerState.dtSequenceNumber = in.readInt();
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ }
+ }
+ }
+
+ private void loadRMApps(RMState state) throws IOException {
+ int numApps = 0;
+ int numAppAttempts = 0;
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seek(bytes(RM_APP_KEY_PREFIX));
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.next();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(RM_APP_KEY_PREFIX)) {
+ break;
+ }
+
+ String appIdStr = key.substring(RM_APP_ROOT.length() + 1);
+ if (appIdStr.contains(SEPARATOR)) {
+ LOG.warn("Skipping extraneous data " + key);
+ continue;
+ }
+
+ numAppAttempts += loadRMApp(state, iter, appIdStr, entry.getValue());
+ ++numApps;
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ LOG.info("Recovered " + numApps + " applications and " + numAppAttempts
+ + " application attempts");
+ }
+
+ private int loadRMApp(RMState rmState, LeveldbIterator iter, String appIdStr,
+ byte[] appData) throws IOException {
+ ApplicationStateData appState = createApplicationState(appIdStr, appData);
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
+ rmState.appState.put(appId, appState);
+ String attemptNodePrefix = getApplicationNodeKey(appId) + SEPARATOR;
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.peekNext();
+ String key = asString(entry.getKey());
+ if (!key.startsWith(attemptNodePrefix)) {
+ break;
+ }
+
+ String attemptId = key.substring(attemptNodePrefix.length());
+ if (attemptId.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+ ApplicationAttemptStateData attemptState =
+ createAttemptState(attemptId, entry.getValue());
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
+ } else {
+ LOG.warn("Ignoring unknown application key: " + key);
+ }
+ iter.next();
+ }
+ int numAttempts = appState.attempts.size();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded application " + appId + " with " + numAttempts
+ + " attempts");
+ }
+ return numAttempts;
+ }
+
+ private ApplicationStateData createApplicationState(String appIdStr,
+ byte[] data) throws IOException {
+ ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+ ApplicationStateDataPBImpl appState =
+ new ApplicationStateDataPBImpl(
+ ApplicationStateDataProto.parseFrom(data));
+ if (!appId.equals(
+ appState.getApplicationSubmissionContext().getApplicationId())) {
+ throw new YarnRuntimeException("The database entry for " + appId
+ + " contains data for "
+ + appState.getApplicationSubmissionContext().getApplicationId());
+ }
+ return appState;
+ }
+
+ @VisibleForTesting
+ ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException {
+ String appKey = getApplicationNodeKey(appId);
+ byte[] data = null;
+ try {
+ data = db.get(bytes(appKey));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ if (data == null) {
+ return null;
+ }
+ return createApplicationState(appId.toString(), data);
+ }
+
+ private ApplicationAttemptStateData createAttemptState(String itemName,
+ byte[] data) throws IOException {
+ ApplicationAttemptId attemptId =
+ ConverterUtils.toApplicationAttemptId(itemName);
+ ApplicationAttemptStateDataPBImpl attemptState =
+ new ApplicationAttemptStateDataPBImpl(
+ ApplicationAttemptStateDataProto.parseFrom(data));
+ if (!attemptId.equals(attemptState.getAttemptId())) {
+ throw new YarnRuntimeException("The database entry for " + attemptId
+ + " contains data for " + attemptState.getAttemptId());
+ }
+ return attemptState;
+ }
+
+ private void loadAMRMTokenSecretManagerState(RMState rmState)
+ throws IOException {
+ try {
+ byte[] data = db.get(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT));
+ if (data != null) {
+ AMRMTokenSecretManagerStatePBImpl stateData =
+ new AMRMTokenSecretManagerStatePBImpl(
+ AMRMTokenSecretManagerStateProto.parseFrom(data));
+ rmState.amrmTokenSecretManagerState =
+ AMRMTokenSecretManagerState.newInstance(
+ stateData.getCurrentMasterKey(),
+ stateData.getNextMasterKey());
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void storeApplicationStateInternal(ApplicationId appId,
+ ApplicationStateData appStateData) throws IOException {
+ String key = getApplicationNodeKey(appId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing state for app " + appId + " at " + key);
+ }
+ try {
+ db.put(bytes(key), appStateData.getProto().toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void updateApplicationStateInternal(ApplicationId appId,
+ ApplicationStateData appStateData) throws IOException {
+ storeApplicationStateInternal(appId, appStateData);
+ }
+
+ @Override
+ protected void storeApplicationAttemptStateInternal(
+ ApplicationAttemptId attemptId,
+ ApplicationAttemptStateData attemptStateData) throws IOException {
+ String key = getApplicationAttemptNodeKey(attemptId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing state for attempt " + attemptId + " at " + key);
+ }
+ try {
+ db.put(bytes(key), attemptStateData.getProto().toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void updateApplicationAttemptStateInternal(
+ ApplicationAttemptId attemptId,
+ ApplicationAttemptStateData attemptStateData) throws IOException {
+ storeApplicationAttemptStateInternal(attemptId, attemptStateData);
+ }
+
+ @Override
+ protected void removeApplicationStateInternal(ApplicationStateData appState)
+ throws IOException {
+ ApplicationId appId =
+ appState.getApplicationSubmissionContext().getApplicationId();
+ String appKey = getApplicationNodeKey(appId);
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ batch.delete(bytes(appKey));
+ for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
+ String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId);
+ batch.delete(bytes(attemptKey));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing state for app " + appId + " and "
+ + appState.attempts.size() + " attempts" + " at " + appKey);
+ }
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void storeRMDelegationTokenAndSequenceNumberState(
+ RMDelegationTokenIdentifier tokenId, Long renewDate,
+ int latestSequenceNumber) throws IOException {
+ String tokenKey = getRMDTTokenNodeKey(tokenId);
+ RMDelegationTokenIdentifierData tokenData =
+ new RMDelegationTokenIdentifierData(tokenId, renewDate);
+ ByteArrayOutputStream bs = new ByteArrayOutputStream();
+ DataOutputStream ds = new DataOutputStream(bs);
+ try {
+ ds.writeInt(latestSequenceNumber);
+ } finally {
+ ds.close();
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing token to " + tokenKey);
+ LOG.debug("Storing " + latestSequenceNumber + " to "
+ + RM_DT_SEQUENCE_NUMBER_KEY);
+ }
+ try {
+ WriteBatch batch = db.createWriteBatch();
+ try {
+ batch.put(bytes(tokenKey), tokenData.toByteArray());
+ batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
+ db.write(batch);
+ } finally {
+ batch.close();
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void updateRMDelegationTokenAndSequenceNumberInternal(
+ RMDelegationTokenIdentifier tokenId, Long renewDate,
+ int latestSequenceNumber) throws IOException {
+ storeRMDelegationTokenAndSequenceNumberState(tokenId, renewDate,
+ latestSequenceNumber);
+ }
+
+ @Override
+ protected void removeRMDelegationTokenState(
+ RMDelegationTokenIdentifier tokenId) throws IOException {
+ String tokenKey = getRMDTTokenNodeKey(tokenId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing token at " + tokenKey);
+ }
+ try {
+ db.delete(bytes(tokenKey));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void storeRMDTMasterKeyState(DelegationKey masterKey)
+ throws IOException {
+ String dbKey = getRMDTMasterKeyNodeKey(masterKey);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing token master key to " + dbKey);
+ }
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(os);
+ try {
+ masterKey.write(out);
+ } finally {
+ out.close();
+ }
+ try {
+ db.put(bytes(dbKey), os.toByteArray());
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void removeRMDTMasterKeyState(DelegationKey masterKey)
+ throws IOException {
+ String dbKey = getRMDTMasterKeyNodeKey(masterKey);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing token master key at " + dbKey);
+ }
+ try {
+ db.delete(bytes(dbKey));
+ } catch (DBException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void storeOrUpdateAMRMTokenSecretManagerState(
+ AMRMTokenSecretManagerState state, boolean isUpdate) {
+ AMRMTokenSecretManagerState data =
+ AMRMTokenSecretManagerState.newInstance(state);
+ byte[] stateData = data.getProto().toByteArray();
+ try {
+ db.put(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT), stateData);
+ } catch (DBException e) {
+ notifyStoreOperationFailed(e);
+ }
+ }
+
+ @Override
+ public void deleteStore() throws IOException {
+ Path root = getStorageDir();
+ LOG.info("Deleting state database at " + root);
+ db.close();
+ db = null;
+ FileSystem fs = FileSystem.getLocal(getConfig());
+ fs.delete(root, true);
+ }
+
+ @VisibleForTesting
+ int getNumEntriesInDatabase() throws IOException {
+ int numEntries = 0;
+ LeveldbIterator iter = null;
+ try {
+ iter = new LeveldbIterator(db);
+ iter.seekToFirst();
+ while (iter.hasNext()) {
+ Entry<byte[],byte[]> entry = iter.next();
+ LOG.info("entry: " + asString(entry.getKey()));
+ ++numEntries;
+ }
+ } catch (DBException e) {
+ throw new IOException(e);
+ } finally {
+ if (iter != null) {
+ iter.close();
+ }
+ }
+ return numEntries;
+ }
+
+ private static class LeveldbLogger implements Logger {
+ private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
+
+ @Override
+ public void log(String message) {
+ LOG.info(message);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d208c901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 8d6a7b6..3d07b37 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -621,6 +621,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
// load state
store = stateStoreHelper.getRMStateStore();
+ when(rmContext.getStateStore()).thenReturn(store);
store.setRMDispatcher(dispatcher);
RMState state = store.loadState();
Assert.assertNotNull(state.getAMRMTokenSecretManagerState());
@@ -640,6 +641,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
// load state
store = stateStoreHelper.getRMStateStore();
+ when(rmContext.getStateStore()).thenReturn(store);
store.setRMDispatcher(dispatcher);
RMState state_2 = store.loadState();
Assert.assertNotNull(state_2.getAMRMTokenSecretManagerState());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d208c901/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
new file mode 100644
index 0000000..ae885d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
+
+ private static final File TEST_DIR = new File(
+ System.getProperty("test.build.data",
+ System.getProperty("java.io.tmpdir")),
+ TestLeveldbRMStateStore.class.getName());
+
+ private YarnConfiguration conf;
+ private LeveldbRMStateStore stateStore = null;
+
+ @Before
+ public void setup() throws IOException {
+ FileUtil.fullyDelete(TEST_DIR);
+ conf = new YarnConfiguration();
+ conf.set(YarnConfiguration.RM_LEVELDB_STORE_PATH, TEST_DIR.toString());
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ if (stateStore != null) {
+ stateStore.close();
+ }
+ FileUtil.fullyDelete(TEST_DIR);
+ }
+
+ @Test(timeout = 60000)
+ public void testApps() throws Exception {
+ LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+ testRMAppStateStore(tester);
+ }
+
+ @Test(timeout = 60000)
+ public void testClientTokens() throws Exception {
+ LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+ testRMDTSecretManagerStateStore(tester);
+ }
+
+ @Test(timeout = 60000)
+ public void testVersion() throws Exception {
+ LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+ testCheckVersion(tester);
+ }
+
+ @Test(timeout = 60000)
+ public void testEpoch() throws Exception {
+ LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+ testEpoch(tester);
+ }
+
+ @Test(timeout = 60000)
+ public void testAppDeletion() throws Exception {
+ LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+ testAppDeletion(tester);
+ }
+
+ @Test(timeout = 60000)
+ public void testDeleteStore() throws Exception {
+ LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+ testDeleteStore(tester);
+ }
+
+ @Test(timeout = 60000)
+ public void testAMTokens() throws Exception {
+ LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+ testAMRMTokenSecretManagerStateStore(tester);
+ }
+
+ class LeveldbStateStoreTester implements RMStateStoreHelper {
+
+ @Override
+ public RMStateStore getRMStateStore() throws Exception {
+ if (stateStore != null) {
+ stateStore.close();
+ }
+ stateStore = new LeveldbRMStateStore();
+ stateStore.init(conf);
+ stateStore.start();
+ return stateStore;
+ }
+
+ @Override
+ public boolean isFinalStateValid() throws Exception {
+ // There should be 6 total entries:
+ // 1 entry for version
+ // 2 entries for app 0010 with one attempt
+ // 3 entries for app 0001 with two attempts
+ return stateStore.getNumEntriesInDatabase() == 6;
+ }
+
+ @Override
+ public void writeVersion(Version version) throws Exception {
+ stateStore.dbStoreVersion(version);
+ }
+
+ @Override
+ public Version getCurrentVersion() throws Exception {
+ return stateStore.getCurrentVersion();
+ }
+
+ @Override
+ public boolean appExists(RMApp app) throws Exception {
+ if (stateStore.isClosed()) {
+ getRMStateStore();
+ }
+ return stateStore.loadRMAppState(app.getApplicationId()) != null;
+ }
+ }
+}