You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/01/26 17:29:44 UTC

hadoop git commit: MAPREDUCE-6141. History server leveldb recovery store. Contributed by Jason Lowe

Repository: hadoop
Updated Branches:
  refs/heads/trunk 902c6ea7e -> 56b7ec71a


MAPREDUCE-6141. History server leveldb recovery store. Contributed by Jason Lowe


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/56b7ec71
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/56b7ec71
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/56b7ec71

Branch: refs/heads/trunk
Commit: 56b7ec71a69820ae12b4b9e2eb04b7368f721dbf
Parents: 902c6ea
Author: Jason Lowe <jl...@apache.org>
Authored: Mon Jan 26 16:28:55 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon Jan 26 16:28:55 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   2 +
 .../mapreduce/v2/jobhistory/JHAdminConfig.java  |   7 +
 .../src/main/resources/mapred-default.xml       |   8 +
 .../hadoop-mapreduce-client-hs/pom.xml          |   4 +
 .../HistoryServerLeveldbStateStoreService.java  | 379 +++++++++++++++++++
 ...stHistoryServerLeveldbStateStoreService.java | 207 ++++++++++
 6 files changed, 607 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/56b7ec71/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index b28fc65..35ceb2e 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -262,6 +262,8 @@ Release 2.7.0 - UNRELEASED
     cache with enabling wired encryption at the same time. 
     (Junping Du via xgong)
 
+    MAPREDUCE-6141. History server leveldb recovery store (jlowe)
+
   OPTIMIZATIONS
 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56b7ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
index e5a49b5..f7cba9f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
@@ -197,6 +197,13 @@ public class JHAdminConfig {
   public static final String MR_HS_FS_STATE_STORE_URI =
       MR_HISTORY_PREFIX + "recovery.store.fs.uri";
 
+  /**
+   * The local path where server state will be stored when
+   * HistoryServerLeveldbStateStoreService is configured as the state store
+   */
+  public static final String MR_HS_LEVELDB_STATE_STORE_PATH =
+      MR_HISTORY_PREFIX + "recovery.store.leveldb.path";
+
   /** Whether to use fixed ports with the minicluster. */
   public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX
        + "minicluster.fixed.ports";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56b7ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 57a17a8..4535137 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -1569,6 +1569,14 @@
 </property>
 
 <property>
+  <name>mapreduce.jobhistory.recovery.store.leveldb.path</name>
+  <value>${hadoop.tmp.dir}/mapred/history/recoverystore</value>
+  <description>The URI where history server state will be stored if
+  HistoryServerLeveldbSystemStateStoreService is configured as the recovery
+  storage class.</description>
+</property>
+
+<property>
   <name>mapreduce.jobhistory.http.policy</name>
   <value>HTTP_ONLY</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56b7ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
index adeb9fa..fa8162b 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
@@ -63,6 +63,10 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.fusesource.leveldbjni</groupId>
+      <artifactId>leveldbjni-all</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56b7ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java
new file mode 100644
index 0000000..16366b1
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerLeveldbStateStoreService.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import static org.fusesource.leveldbjni.JniDBFactory.asString;
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.fusesource.leveldbjni.internal.NativeDB;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.Logger;
+import org.iq80.leveldb.Options;
+
+public class HistoryServerLeveldbStateStoreService extends
+    HistoryServerStateStoreService {
+
+  private static final String DB_NAME = "mr-jhs-state";
+  private static final String DB_SCHEMA_VERSION_KEY = "jhs-schema-version";
+  private static final String TOKEN_MASTER_KEY_KEY_PREFIX = "tokens/key_";
+  private static final String TOKEN_STATE_KEY_PREFIX = "tokens/token_";
+
+  private static final Version CURRENT_VERSION_INFO =
+      Version.newInstance(1, 0);
+
+  private DB db;
+
+  public static final Log LOG =
+      LogFactory.getLog(HistoryServerLeveldbStateStoreService.class);
+
+  @Override
+  protected void initStorage(Configuration conf) throws IOException {
+  }
+
+  @Override
+  protected void startStorage() throws IOException {
+    Path storeRoot = createStorageDir(getConfig());
+    Options options = new Options();
+    options.createIfMissing(false);
+    options.logger(new LeveldbLogger());
+    LOG.info("Using state database at " + storeRoot + " for recovery");
+    File dbfile = new File(storeRoot.toString());
+    try {
+      db = JniDBFactory.factory.open(dbfile, options);
+    } catch (NativeDB.DBException e) {
+      if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
+        LOG.info("Creating state database at " + dbfile);
+        options.createIfMissing(true);
+        try {
+          db = JniDBFactory.factory.open(dbfile, options);
+          // store version
+          storeVersion();
+        } catch (DBException dbErr) {
+          throw new IOException(dbErr.getMessage(), dbErr);
+        }
+      } else {
+        throw e;
+      }
+    }
+    checkVersion();
+  }
+
+  @Override
+  protected void closeStorage() throws IOException {
+    if (db != null) {
+      db.close();
+      db = null;
+    }
+  }
+
+  @Override
+  public HistoryServerState loadState() throws IOException {
+    HistoryServerState state = new HistoryServerState();
+    int numKeys = loadTokenMasterKeys(state);
+    LOG.info("Recovered " + numKeys + " token master keys");
+    int numTokens = loadTokens(state);
+    LOG.info("Recovered " + numTokens + " tokens");
+    return state;
+  }
+
+  private int loadTokenMasterKeys(HistoryServerState state)
+      throws IOException {
+    int numKeys = 0;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(TOKEN_MASTER_KEY_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(TOKEN_MASTER_KEY_KEY_PREFIX)) {
+          break;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading master key from " + key);
+        }
+        try {
+          loadTokenMasterKey(state, entry.getValue());
+        } catch (IOException e) {
+          throw new IOException("Error loading token master key from " + key,
+              e);
+        }
+        ++numKeys;
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return numKeys;
+  }
+
+  private void loadTokenMasterKey(HistoryServerState state, byte[] data)
+      throws IOException {
+    DelegationKey key = new DelegationKey();
+    DataInputStream in =
+        new DataInputStream(new ByteArrayInputStream(data));
+    try {
+      key.readFields(in);
+    } finally {
+      IOUtils.cleanup(LOG, in);
+    }
+    state.tokenMasterKeyState.add(key);
+  }
+
+  private int loadTokens(HistoryServerState state) throws IOException {
+    int numTokens = 0;
+    LeveldbIterator iter = null;
+    try {
+      iter = new LeveldbIterator(db);
+      iter.seek(bytes(TOKEN_STATE_KEY_PREFIX));
+      while (iter.hasNext()) {
+        Entry<byte[],byte[]> entry = iter.next();
+        String key = asString(entry.getKey());
+        if (!key.startsWith(TOKEN_STATE_KEY_PREFIX)) {
+          break;
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Loading token from " + key);
+        }
+        try {
+          loadToken(state, entry.getValue());
+        } catch (IOException e) {
+          throw new IOException("Error loading token state from " + key, e);
+        }
+        ++numTokens;
+      }
+    } catch (DBException e) {
+      throw new IOException(e);
+    } finally {
+      if (iter != null) {
+        iter.close();
+      }
+    }
+    return numTokens;
+  }
+
+  private void loadToken(HistoryServerState state, byte[] data)
+      throws IOException {
+    MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier();
+    long renewDate;
+    DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
+    try {
+      tokenId.readFields(in);
+      renewDate = in.readLong();
+    } finally {
+      IOUtils.cleanup(LOG, in);
+    }
+    state.tokenState.put(tokenId, renewDate);
+  }
+
+  @Override
+  public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing token " + tokenId.getSequenceNumber());
+    }
+
+    ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+    DataOutputStream dataStream = new DataOutputStream(memStream);
+    try {
+      tokenId.write(dataStream);
+      dataStream.writeLong(renewDate);
+      dataStream.close();
+      dataStream = null;
+    } finally {
+      IOUtils.cleanup(LOG, dataStream);
+    }
+
+    String dbKey = getTokenDatabaseKey(tokenId);
+    try {
+      db.put(bytes(dbKey), memStream.toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
+      throws IOException {
+    storeToken(tokenId, renewDate);
+  }
+
+  @Override
+  public void removeToken(MRDelegationTokenIdentifier tokenId)
+      throws IOException {
+    String dbKey = getTokenDatabaseKey(tokenId);
+    try {
+      db.delete(bytes(dbKey));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private String getTokenDatabaseKey(MRDelegationTokenIdentifier tokenId) {
+    return TOKEN_STATE_KEY_PREFIX + tokenId.getSequenceNumber();
+  }
+
+  @Override
+  public void storeTokenMasterKey(DelegationKey masterKey)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing master key " + masterKey.getKeyId());
+    }
+
+    ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+    DataOutputStream dataStream = new DataOutputStream(memStream);
+    try {
+      masterKey.write(dataStream);
+      dataStream.close();
+      dataStream = null;
+    } finally {
+      IOUtils.cleanup(LOG, dataStream);
+    }
+
+    String dbKey = getTokenMasterKeyDatabaseKey(masterKey);
+    try {
+      db.put(bytes(dbKey), memStream.toByteArray());
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void removeTokenMasterKey(DelegationKey masterKey)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing master key " + masterKey.getKeyId());
+    }
+
+    String dbKey = getTokenMasterKeyDatabaseKey(masterKey);
+    try {
+      db.delete(bytes(dbKey));
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private String getTokenMasterKeyDatabaseKey(DelegationKey masterKey) {
+    return TOKEN_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId();
+  }
+
+  private Path createStorageDir(Configuration conf) throws IOException {
+    String confPath = conf.get(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH);
+    if (confPath == null) {
+      throw new IOException("No store location directory configured in " +
+          JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH);
+    }
+    Path root = new Path(confPath, DB_NAME);
+    FileSystem fs = FileSystem.getLocal(conf);
+    fs.mkdirs(root, new FsPermission((short)0700));
+    return root;
+  }
+
+  Version loadVersion() throws IOException {
+    byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return Version.newInstance(1, 0);
+    }
+    Version version =
+        new VersionPBImpl(VersionProto.parseFrom(data));
+    return version;
+  }
+
+  private void storeVersion() throws IOException {
+    dbStoreVersion(CURRENT_VERSION_INFO);
+  }
+
+  void dbStoreVersion(Version state) throws IOException {
+    String key = DB_SCHEMA_VERSION_KEY;
+    byte[] data =
+        ((VersionPBImpl) state).getProto().toByteArray();
+    try {
+      db.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+
+  /**
+   * 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
+   * 2) Any incompatible change of state-store is a major upgrade, and any
+   *    compatible change of state-store is a minor upgrade.
+   * 3) Within a minor upgrade, say 1.1 to 1.2:
+   *    overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0:
+   *    throw exception and indicate user to use a separate upgrade tool to
+   *    upgrade state or remove incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    Version loadedVersion = loadVersion();
+    LOG.info("Loaded state version info " + loadedVersion);
+    if (loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing state version info " + getCurrentVersion());
+      storeVersion();
+    } else {
+      throw new IOException(
+        "Incompatible version for state: expecting state version "
+            + getCurrentVersion() + ", but loading version " + loadedVersion);
+    }
+  }
+
+  private static class LeveldbLogger implements Logger {
+    private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
+
+    @Override
+    public void log(String message) {
+      LOG.info(message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/56b7ec71/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java
new file mode 100644
index 0000000..2af2f84
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerLeveldbStateStoreService.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryServerLeveldbStateStoreService {
+
+  private static final File testDir = new File(
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")),
+      "TestHistoryServerLeveldbSystemStateStoreService");
+
+  private Configuration conf;
+
+  @Before
+  public void setup() {
+    FileUtil.fullyDelete(testDir);
+    testDir.mkdirs();
+    conf = new Configuration();
+    conf.setBoolean(JHAdminConfig.MR_HS_RECOVERY_ENABLE, true);
+    conf.setClass(JHAdminConfig.MR_HS_STATE_STORE,
+        HistoryServerLeveldbStateStoreService.class,
+        HistoryServerStateStoreService.class);
+    conf.set(JHAdminConfig.MR_HS_LEVELDB_STATE_STORE_PATH,
+        testDir.getAbsoluteFile().toString());
+  }
+
+  @After
+  public void cleanup() {
+    FileUtil.fullyDelete(testDir);
+  }
+
+  private HistoryServerStateStoreService createAndStartStore()
+      throws IOException {
+    HistoryServerStateStoreService store =
+        HistoryServerStateStoreServiceFactory.getStore(conf);
+    assertTrue("Factory did not create a leveldb store",
+        store instanceof HistoryServerLeveldbStateStoreService);
+    store.init(conf);
+    store.start();
+    return store;
+  }
+
+  @Test
+  public void testCheckVersion() throws IOException {
+    HistoryServerLeveldbStateStoreService store =
+        new HistoryServerLeveldbStateStoreService();
+    store.init(conf);
+    store.start();
+
+    // default version
+    Version defaultVersion = store.getCurrentVersion();
+    assertEquals(defaultVersion, store.loadVersion());
+
+    // compatible version
+    Version compatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    store.dbStoreVersion(compatibleVersion);
+    assertEquals(compatibleVersion, store.loadVersion());
+    store.close();
+    store = new HistoryServerLeveldbStateStoreService();
+    store.init(conf);
+    store.start();
+
+    // overwrite the compatible version
+    assertEquals(defaultVersion, store.loadVersion());
+
+    // incompatible version
+    Version incompatibleVersion =
+      Version.newInstance(defaultVersion.getMajorVersion() + 1,
+          defaultVersion.getMinorVersion());
+    store.dbStoreVersion(incompatibleVersion);
+    store.close();
+    store = new HistoryServerLeveldbStateStoreService();
+    try {
+      store.init(conf);
+      store.start();
+      fail("Incompatible version, should have thrown before here.");
+    } catch (ServiceStateException e) {
+      assertTrue("Exception message mismatch",
+        e.getMessage().contains("Incompatible version for state:"));
+    }
+    store.close();
+  }
+
+  @Test
+  public void testTokenStore() throws IOException {
+    HistoryServerStateStoreService store = createAndStartStore();
+
+    // verify initially the store is empty
+    HistoryServerState state = store.loadState();
+    assertTrue("token state not empty", state.tokenState.isEmpty());
+    assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty());
+
+    // store a key and some tokens
+    final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes());
+    final MRDelegationTokenIdentifier token1 =
+        new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
+            new Text("tokenRenewer1"), new Text("tokenUser1"));
+    token1.setSequenceNumber(1);
+    final Long tokenDate1 = 1L;
+    final MRDelegationTokenIdentifier token2 =
+        new MRDelegationTokenIdentifier(new Text("tokenOwner2"),
+            new Text("tokenRenewer2"), new Text("tokenUser2"));
+    token2.setSequenceNumber(12345678);
+    final Long tokenDate2 = 87654321L;
+
+    store.storeTokenMasterKey(key1);
+    store.storeToken(token1, tokenDate1);
+    store.storeToken(token2, tokenDate2);
+    store.close();
+
+    // verify the key and tokens can be recovered
+    store = createAndStartStore();
+    state = store.loadState();
+    assertEquals("incorrect loaded token count", 2, state.tokenState.size());
+    assertTrue("missing token 1", state.tokenState.containsKey(token1));
+    assertEquals("incorrect token 1 date", tokenDate1,
+        state.tokenState.get(token1));
+    assertTrue("missing token 2", state.tokenState.containsKey(token2));
+    assertEquals("incorrect token 2 date", tokenDate2,
+        state.tokenState.get(token2));
+    assertEquals("incorrect master key count", 1,
+        state.tokenMasterKeyState.size());
+    assertTrue("missing master key 1",
+        state.tokenMasterKeyState.contains(key1));
+
+    // store some more keys and tokens, remove the previous key and one
+    // of the tokens, and renew a previous token
+    final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes());
+    final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes());
+    final MRDelegationTokenIdentifier token3 =
+        new MRDelegationTokenIdentifier(new Text("tokenOwner3"),
+            new Text("tokenRenewer3"), new Text("tokenUser3"));
+    token3.setSequenceNumber(12345679);
+    final Long tokenDate3 = 87654321L;
+
+    store.removeToken(token1);
+    store.storeTokenMasterKey(key2);
+    final Long newTokenDate2 = 975318642L;
+    store.updateToken(token2, newTokenDate2);
+    store.removeTokenMasterKey(key1);
+    store.storeTokenMasterKey(key3);
+    store.storeToken(token3, tokenDate3);
+    store.close();
+
+    // verify the new keys and tokens are recovered, the removed key and
+    // token are no longer present, and the renewed token has the updated
+    // expiration date
+    store = createAndStartStore();
+    state = store.loadState();
+    assertEquals("incorrect loaded token count", 2, state.tokenState.size());
+    assertFalse("token 1 not removed", state.tokenState.containsKey(token1));
+    assertTrue("missing token 2", state.tokenState.containsKey(token2));
+    assertEquals("incorrect token 2 date", newTokenDate2,
+        state.tokenState.get(token2));
+    assertTrue("missing token 3", state.tokenState.containsKey(token3));
+    assertEquals("incorrect token 3 date", tokenDate3,
+        state.tokenState.get(token3));
+    assertEquals("incorrect master key count", 2,
+        state.tokenMasterKeyState.size());
+    assertFalse("master key 1 not removed",
+        state.tokenMasterKeyState.contains(key1));
+    assertTrue("missing master key 2",
+        state.tokenMasterKeyState.contains(key2));
+    assertTrue("missing master key 3",
+        state.tokenMasterKeyState.contains(key3));
+    store.close();
+  }
+}