You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/12/03 06:23:58 UTC

[26/50] [abbrv] hadoop git commit: YARN-2765. Added leveldb-based implementation for RMStateStore. Contributed by Jason Lowe

YARN-2765. Added leveldb-based implementation for RMStateStore. Contributed by Jason Lowe


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a7fba0bc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a7fba0bc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a7fba0bc

Branch: refs/heads/HDFS-EC
Commit: a7fba0bc28764e0fb36c335ea60cc58079fe007f
Parents: 031f980
Author: Jian He <ji...@apache.org>
Authored: Mon Dec 1 16:38:25 2014 -0800
Committer: Jian He <ji...@apache.org>
Committed: Mon Dec 1 16:38:25 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   2 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   3 +
 .../src/main/resources/yarn-default.xml         |   8 +
 .../hadoop-yarn-server-resourcemanager/pom.xml  |   4 +
 .../recovery/LeveldbRMStateStore.java           | 691 +++++++++++++++++++
 .../recovery/RMStateStoreTestBase.java          |   2 +
 .../recovery/TestLeveldbRMStateStore.java       | 139 ++++
 7 files changed, 849 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7fba0bc/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 755763a..6a7d55d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -51,6 +51,8 @@ Release 2.7.0 - UNRELEASED
     YARN-2188. [YARN-1492] Client service for cache manager. 
     (Chris Trezzo and Sangjin Lee via kasha)
 
+    YARN-2765. Added leveldb-based implementation for RMStateStore. (Jason Lowe
+    via jianhe)
 
   IMPROVEMENTS
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7fba0bc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 52bc821..41f85ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -506,6 +506,9 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
       "2000, 500";
 
+  public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
+      + "leveldb-state-store.path";
+
   /** The maximum number of completed applications RM keeps. */ 
   public static final String RM_MAX_COMPLETED_APPLICATIONS =
     RM_PREFIX + "max-completed-applications";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7fba0bc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 2f549da..54c4dbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -420,6 +420,14 @@
   </property>
 
   <property>
+    <description>Local path where the RM state will be stored when using
+    org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore
+    as the value for yarn.resourcemanager.store.class</description>
+    <name>yarn.resourcemanager.leveldb-state-store.path</name>
+    <value>${hadoop.tmp.dir}/yarn/system/rmstore</value>
+  </property>
+
+  <property>
     <description>Enable RM high-availability. When enabled,
       (1) The RM starts in the Standby mode by default, and transitions to
       the Active mode when prompted to.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7fba0bc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index 9496517..9bcc7c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -179,6 +179,10 @@
       <artifactId>zookeeper</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <type>test-jar</type>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7fba0bc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
new file mode 100644
index 0000000..38ce370
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/LeveldbRMStateStore.java
@@ -0,0 +1,691 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Logger;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class LeveldbRMStateStore extends RMStateStore {
+
+  public static final Log LOG =
+      LogFactory.getLog(LeveldbRMStateStore.class);
+
+  private static final String SEPARATOR = "/";
+  private static final String DB_NAME = "yarn-rm-state";
+  private static final String RM_DT_MASTER_KEY_KEY_PREFIX =
+      RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX;
+  private static final String RM_DT_TOKEN_KEY_PREFIX =
+      RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_TOKEN_PREFIX;
+  private static final String RM_DT_SEQUENCE_NUMBER_KEY =
+      RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber";
+  private static final String RM_APP_KEY_PREFIX =
+      RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix;
+
+  private static final Version CURRENT_VERSION_INFO = Version
+      .newInstance(1, 0);
+
+  private DB db;
+
+  private String getApplicationNodeKey(ApplicationId appId) {
+    return RM_APP_ROOT + SEPARATOR + appId;
+  }
+
+  private String getApplicationAttemptNodeKey(ApplicationAttemptId attemptId) {
+    return getApplicationAttemptNodeKey(
+        getApplicationNodeKey(attemptId.getApplicationId()), attemptId);
+  }
+
+  private String getApplicationAttemptNodeKey(String appNodeKey,
+      ApplicationAttemptId attemptId) {
+    return appNodeKey + SEPARATOR + attemptId;
+  }
+
+  private String getRMDTMasterKeyNodeKey(DelegationKey masterKey) {
+    return RM_DT_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId();
+  }
+
+  private String getRMDTTokenNodeKey(RMDelegationTokenIdentifier tokenId) {
+    return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber();
+  }
+
+  @Override
+  protected void initInternal(Configuration conf) throws Exception {
+  }
+
+  private Path getStorageDir() throws IOException {
+    Configuration conf = getConfig();
+    String storePath = conf.get(YarnConfiguration.RM_LEVELDB_STORE_PATH);
+    if (storePath == null) {
+      throw new IOException("No store location directory configured in " +
+          YarnConfiguration.RM_LEVELDB_STORE_PATH);
+    }
+    return new Path(storePath, DB_NAME);
+  }
+
+  private Path createStorageDir() throws IOException {
+    Path root = getStorageDir();
+    FileSystem fs = FileSystem.getLocal(getConfig());
+    fs.mkdirs(root, new FsPermission((short)0700));
+    return root;
+  }
+
+  @Override
+  protected void startInternal() throws Exception {
+    Path storeRoot = createStorageDir();
+    Options options = new Options();
+    options.createIfMissing(false);
+    options.logger(new LeveldbLogger());
+    LOG.info("Using state database at " + storeRoot + " for recovery");
+    File dbfile = new File(storeRoot.toString());
+    try {
+      db = JniDBFactory.factory.open(dbfile, options);
+    } catch (NativeDB.DBException e) {
+      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+        LOG.info("Creating state database at " + dbfile);
+        options.createIfMissing(true);
+        try {
+          db = JniDBFactory.factory.open(dbfile, options);
+          // store version
+          storeVersion();
+        } catch (DBException dbErr) {
+          throw new IOException(dbErr.getMessage(), dbErr);
+        }
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  @Override
+  protected void closeInternal() throws Exception {
+    if (db != null) {
+      db.close();
+      db = null;
+    }
+  }
+
+  @VisibleForTesting
+  boolean isClosed() {
+    return db == null;
+  }
+
+  @Override
+  protected Version loadVersion() throws Exception {
+    Version version = null;
+    try {
+      byte[] data = db.get(bytes(VERSION_NODE));
+      if (data != null) {
+        version = new VersionPBImpl(VersionProto.parseFrom(data));
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+    return version;
+  }
+
+  @Override
+  protected void storeVersion() throws Exception {
+    dbStoreVersion(CURRENT_VERSION_INFO);
+  }
+
+  void dbStoreVersion(Version state) throws IOException {
+    String key = VERSION_NODE;
+    byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
+    try {
+      db.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+
+  @Override
+  public synchronized long getAndIncrementEpoch() throws Exception {
+    long currentEpoch = 0;
+    byte[] dbKeyBytes = bytes(EPOCH_NODE);
+    try {
+      byte[] data = db.get(dbKeyBytes);
+      if (data != null) {
+        currentEpoch = EpochProto.parseFrom(data).getEpoch();
+      }
+      EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto();
+      db.put(dbKeyBytes, proto.toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+    return currentEpoch;
+  }
+
+  @Override
+  public RMState loadState() throws Exception {
+    RMState rmState = new RMState();
+     loadRMDTSecretManagerState(rmState);
+     loadRMApps(rmState);
+     loadAMRMTokenSecretManagerState(rmState);
+    return rmState;
+   }
+
+  private void loadRMDTSecretManagerState(RMState state) throws IOException {
+    int numKeys = loadRMDTSecretManagerKeys(state);
+    LOG.info("Recovered " + numKeys + " RM delegation token master keys");
+    int numTokens = loadRMDTSecretManagerTokens(state);
+    LOG.info("Recovered " + numTokens + " RM delegation tokens");
+    loadRMDTSecretManagerTokenSequenceNumber(state);
+  }
+
+  private int loadRMDTSecretManagerKeys(RMState state) throws IOException {
+    int numKeys = 0;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(RM_DT_MASTER_KEY_KEY_PREFIX)) {
+          break;
+        }
+        DelegationKey masterKey = loadDelegationKey(entry.getValue());
+        state.rmSecretManagerState.masterKeyState.add(masterKey);
+        ++numKeys;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loaded RM delegation key from " + key
+              + ": keyId=" + masterKey.getKeyId()
+              + ", expirationDate=" + masterKey.getExpiryDate());
+        }
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return numKeys;
+  }
+
+  private DelegationKey loadDelegationKey(byte[] data) throws IOException {
+    DelegationKey key = new DelegationKey();
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+    try {
+      key.readFields(in);
+    } finally {
+      IOUtils.cleanup(LOG, in);
+    }
+    return key;
+  }
+
+  private int loadRMDTSecretManagerTokens(RMState state) throws IOException {
+    int numTokens = 0;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(RM_DT_TOKEN_KEY_PREFIX)) {
+          break;
+        }
+        RMDelegationTokenIdentifierData tokenData = loadDelegationToken(
+            entry.getValue());
+        RMDelegationTokenIdentifier tokenId = tokenData.getTokenIdentifier();
+        long renewDate = tokenData.getRenewDate();
+        state.rmSecretManagerState.delegationTokenState.put(tokenId,
+            renewDate);
+        ++numTokens;
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loaded RM delegation token from " + key
+              + ": tokenId=" + tokenId + ", renewDate=" + renewDate);
+        }
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return numTokens;
+  }
+
+  private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
+      throws IOException {
+    RMDelegationTokenIdentifierData tokenData =
+        new RMDelegationTokenIdentifierData();
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+    try {
+      tokenData.readFields(in);
+    } finally {
+      IOUtils.cleanup(LOG, in);
+    }
+    return tokenData;
+  }
+
+  private void loadRMDTSecretManagerTokenSequenceNumber(RMState state)
+      throws IOException {
+    byte[] data = null;
+    try {
+      data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+    if (data != null) {
+      DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+      try {
+        state.rmSecretManagerState.dtSequenceNumber = in.readInt();
+      } finally {
+        IOUtils.cleanup(LOG, in);
+      }
+    }
+  }
+
+  private void loadRMApps(RMState state) throws IOException {
+    int numApps = 0;
+    int numAppAttempts = 0;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(RM_APP_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(RM_APP_KEY_PREFIX)) {
+          break;
+        }
+
+        String appIdStr = key.substring(RM_APP_ROOT.length() + 1);
+        if (appIdStr.contains(SEPARATOR)) {
+          LOG.warn("Skipping extraneous data " + key);
+          continue;
+        }
+
+        numAppAttempts += loadRMApp(state, iter, appIdStr, entry.getValue());
+        ++numApps;
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    LOG.info("Recovered " + numApps + " applications and " + numAppAttempts
+        + " application attempts");
+  }
+
+  private int loadRMApp(RMState rmState, LeveldbIterator iter, String appIdStr,
+      byte[] appData) throws IOException {
+    ApplicationStateData appState = createApplicationState(appIdStr, appData);
+    ApplicationId appId =
+        appState.getApplicationSubmissionContext().getApplicationId();
+    rmState.appState.put(appId, appState);
+    String attemptNodePrefix = getApplicationNodeKey(appId) + SEPARATOR;
+    while (iter.hasNext()) {
+      Entry<byte[],byte[]> entry = iter.peekNext();
+      String key = asString(entry.getKey());
+      if (!key.startsWith(attemptNodePrefix)) {
+        break;
+      }
+
+      String attemptId = key.substring(attemptNodePrefix.length());
+      if (attemptId.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+        ApplicationAttemptStateData attemptState =
+            createAttemptState(attemptId, entry.getValue());
+        appState.attempts.put(attemptState.getAttemptId(), attemptState);
+      } else {
+        LOG.warn("Ignoring unknown application key: " + key);
+      }
+      iter.next();
+    }
+    int numAttempts = appState.attempts.size();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Loaded application " + appId + " with " + numAttempts
+          + " attempts");
+    }
+    return numAttempts;
+  }
+
+  private ApplicationStateData createApplicationState(String appIdStr,
+      byte[] data) throws IOException {
+    ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
+    ApplicationStateDataPBImpl appState =
+        new ApplicationStateDataPBImpl(
+            ApplicationStateDataProto.parseFrom(data));
+    if (!appId.equals(
+        appState.getApplicationSubmissionContext().getApplicationId())) {
+      throw new YarnRuntimeException("The database entry for " + appId
+          + " contains data for "
+          + appState.getApplicationSubmissionContext().getApplicationId());
+    }
+    return appState;
+  }
+
+  @VisibleForTesting
+  ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException {
+    String appKey = getApplicationNodeKey(appId);
+    byte[] data = null;
+    try {
+      data = db.get(bytes(appKey));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+    if (data == null) {
+      return null;
+    }
+    return createApplicationState(appId.toString(), data);
+  }
+
+  private ApplicationAttemptStateData createAttemptState(String itemName,
+      byte[] data) throws IOException {
+    ApplicationAttemptId attemptId =
+        ConverterUtils.toApplicationAttemptId(itemName);
+    ApplicationAttemptStateDataPBImpl attemptState =
+        new ApplicationAttemptStateDataPBImpl(
+            ApplicationAttemptStateDataProto.parseFrom(data));
+    if (!attemptId.equals(attemptState.getAttemptId())) {
+      throw new YarnRuntimeException("The database entry for " + attemptId
+          + " contains data for " + attemptState.getAttemptId());
+    }
+    return attemptState;
+  }
+
+  private void loadAMRMTokenSecretManagerState(RMState rmState)
+      throws IOException {
+    try {
+      byte[] data = db.get(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT));
+      if (data != null) {
+        AMRMTokenSecretManagerStatePBImpl stateData =
+            new AMRMTokenSecretManagerStatePBImpl(
+                AMRMTokenSecretManagerStateProto.parseFrom(data));
+        rmState.amrmTokenSecretManagerState =
+            AMRMTokenSecretManagerState.newInstance(
+                stateData.getCurrentMasterKey(),
+                stateData.getNextMasterKey());
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void storeApplicationStateInternal(ApplicationId appId,
+      ApplicationStateData appStateData) throws IOException {
+    String key = getApplicationNodeKey(appId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing state for app " + appId + " at " + key);
+    }
+    try {
+      db.put(bytes(key), appStateData.getProto().toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void updateApplicationStateInternal(ApplicationId appId,
+      ApplicationStateData appStateData) throws IOException {
+    storeApplicationStateInternal(appId, appStateData);
+  }
+
+  @Override
+  protected void storeApplicationAttemptStateInternal(
+      ApplicationAttemptId attemptId,
+      ApplicationAttemptStateData attemptStateData) throws IOException {
+    String key = getApplicationAttemptNodeKey(attemptId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing state for attempt " + attemptId + " at " + key);
+    }
+    try {
+      db.put(bytes(key), attemptStateData.getProto().toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void updateApplicationAttemptStateInternal(
+      ApplicationAttemptId attemptId,
+      ApplicationAttemptStateData attemptStateData) throws IOException {
+    storeApplicationAttemptStateInternal(attemptId, attemptStateData);
+  }
+
+  @Override
+  protected void removeApplicationStateInternal(ApplicationStateData appState)
+      throws IOException {
+    ApplicationId appId =
+        appState.getApplicationSubmissionContext().getApplicationId();
+    String appKey = getApplicationNodeKey(appId);
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        batch.delete(bytes(appKey));
+        for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
+          String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId);
+          batch.delete(bytes(attemptKey));
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Removing state for app " + appId + " and "
+              + appState.attempts.size() + " attempts" + " at " + appKey);
+        }
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void storeRMDelegationTokenAndSequenceNumberState(
+      RMDelegationTokenIdentifier tokenId, Long renewDate,
+      int latestSequenceNumber) throws IOException {
+    String tokenKey = getRMDTTokenNodeKey(tokenId);
+    RMDelegationTokenIdentifierData tokenData =
+        new RMDelegationTokenIdentifierData(tokenId, renewDate);
+    ByteArrayOutputStream bs = new ByteArrayOutputStream();
+    DataOutputStream ds = new DataOutputStream(bs);
+    try {
+      ds.writeInt(latestSequenceNumber);
+    } finally {
+      ds.close();
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing token to " + tokenKey);
+      LOG.debug("Storing " + latestSequenceNumber + " to "
+          + RM_DT_SEQUENCE_NUMBER_KEY);
+    }
+    try {
+      WriteBatch batch = db.createWriteBatch();
+      try {
+        batch.put(bytes(tokenKey), tokenData.toByteArray());
+        batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
+        db.write(batch);
+      } finally {
+        batch.close();
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void updateRMDelegationTokenAndSequenceNumberInternal(
+      RMDelegationTokenIdentifier tokenId, Long renewDate,
+      int latestSequenceNumber) throws IOException {
+    storeRMDelegationTokenAndSequenceNumberState(tokenId, renewDate,
+        latestSequenceNumber);
+  }
+
+  @Override
+  protected void removeRMDelegationTokenState(
+      RMDelegationTokenIdentifier tokenId) throws IOException {
+    String tokenKey = getRMDTTokenNodeKey(tokenId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing token at " + tokenKey);
+    }
+    try {
+      db.delete(bytes(tokenKey));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void storeRMDTMasterKeyState(DelegationKey masterKey)
+      throws IOException {
+    String dbKey = getRMDTMasterKeyNodeKey(masterKey);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing token master key to " + dbKey);
+    }
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(os);
+    try {
+      masterKey.write(out);
+    } finally {
+      out.close();
+    }
+    try {
+      db.put(bytes(dbKey), os.toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  protected void removeRMDTMasterKeyState(DelegationKey masterKey)
+      throws IOException {
+    String dbKey = getRMDTMasterKeyNodeKey(masterKey);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing token master key at " + dbKey);
+    }
+    try {
+      db.delete(bytes(dbKey));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void storeOrUpdateAMRMTokenSecretManagerState(
+      AMRMTokenSecretManagerState state, boolean isUpdate) {
+    AMRMTokenSecretManagerState data =
+        AMRMTokenSecretManagerState.newInstance(state);
+    byte[] stateData = data.getProto().toByteArray();
+    try {
+      db.put(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT), stateData);
+    } catch (DBException e) {
+      notifyStoreOperationFailed(e);
+    }
+  }
+
+  @Override
+  public void deleteStore() throws IOException {
+    Path root = getStorageDir();
+    LOG.info("Deleting state database at " + root);
+    db.close();
+    db = null;
+    FileSystem fs = FileSystem.getLocal(getConfig());
+    fs.delete(root, true);
+  }
+
+  @VisibleForTesting
+  int getNumEntriesInDatabase() throws IOException {
+    int numEntries = 0;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seekToFirst();
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.next();
+        LOG.info("entry: " + asString(entry.getKey()));
+        ++numEntries;
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return numEntries;
+  }
+
+  private static class LeveldbLogger implements Logger {
+    private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
+
+    @Override
+    public void log(String message) {
+      LOG.info(message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7fba0bc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 8d6a7b6..3d07b37 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -621,6 +621,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
 
     // load state
     store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
     store.setRMDispatcher(dispatcher);
     RMState state = store.loadState();
     Assert.assertNotNull(state.getAMRMTokenSecretManagerState());
@@ -640,6 +641,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
 
     // load state
     store = stateStoreHelper.getRMStateStore();
+    when(rmContext.getStateStore()).thenReturn(store);
     store.setRMDispatcher(dispatcher);
     RMState state_2 = store.loadState();
     Assert.assertNotNull(state_2.getAMRMTokenSecretManagerState());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a7fba0bc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
new file mode 100644
index 0000000..ae885d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestLeveldbRMStateStore.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
+
+  private static final File TEST_DIR = new File(
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")),
+      TestLeveldbRMStateStore.class.getName());
+
+  private YarnConfiguration conf;
+  private LeveldbRMStateStore stateStore = null;
+
+  @Before
+  public void setup() throws IOException {
+    FileUtil.fullyDelete(TEST_DIR);
+    conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.RM_LEVELDB_STORE_PATH, TEST_DIR.toString());
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    if (stateStore != null) {
+      stateStore.close();
+    }
+    FileUtil.fullyDelete(TEST_DIR);
+  }
+
+  @Test(timeout = 60000)
+  public void testApps() throws Exception {
+    LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+    testRMAppStateStore(tester);
+  }
+
+  @Test(timeout = 60000)
+  public void testClientTokens() throws Exception {
+    LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+    testRMDTSecretManagerStateStore(tester);
+  }
+
+  @Test(timeout = 60000)
+  public void testVersion() throws Exception {
+    LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+    testCheckVersion(tester);
+  }
+
+  @Test(timeout = 60000)
+  public void testEpoch() throws Exception {
+    LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+    testEpoch(tester);
+  }
+
+  @Test(timeout = 60000)
+  public void testAppDeletion() throws Exception {
+    LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+    testAppDeletion(tester);
+  }
+
+  @Test(timeout = 60000)
+  public void testDeleteStore() throws Exception {
+    LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+    testDeleteStore(tester);
+  }
+
+  @Test(timeout = 60000)
+  public void testAMTokens() throws Exception {
+    LeveldbStateStoreTester tester = new LeveldbStateStoreTester();
+    testAMRMTokenSecretManagerStateStore(tester);
+  }
+
+  class LeveldbStateStoreTester implements RMStateStoreHelper {
+
+    @Override
+    public RMStateStore getRMStateStore() throws Exception {
+      if (stateStore != null) {
+        stateStore.close();
+      }
+      stateStore = new LeveldbRMStateStore();
+      stateStore.init(conf);
+      stateStore.start();
+      return stateStore;
+    }
+
+    @Override
+    public boolean isFinalStateValid() throws Exception {
+      // There should be 6 total entries:
+      //   1 entry for version
+      //   2 entries for app 0010 with one attempt
+      //   3 entries for app 0001 with two attempts
+      return stateStore.getNumEntriesInDatabase() == 6;
+    }
+
+    @Override
+    public void writeVersion(Version version) throws Exception {
+      stateStore.dbStoreVersion(version);
+    }
+
+    @Override
+    public Version getCurrentVersion() throws Exception {
+      return stateStore.getCurrentVersion();
+    }
+
+    @Override
+    public boolean appExists(RMApp app) throws Exception {
+      if (stateStore.isClosed()) {
+        getRMStateStore();
+      }
+      return stateStore.loadRMAppState(app.getApplicationId()) != null;
+    }
+  }
+}