You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/09/27 20:19:42 UTC

svn commit: r1527015 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/re...

Author: jlowe
Date: Fri Sep 27 18:19:41 2013
New Revision: 1527015

URL: http://svn.apache.org/r1527015
Log:
MAPREDUCE-5332. Support token-preserving restart of history server. Contributed by Jason Lowe

Added:
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java   (with props)
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java   (with props)
Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1527015&r1=1527014&r2=1527015&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Sep 27 18:19:41 2013
@@ -155,6 +155,8 @@ Release 2.3.0 - UNRELEASED
     MAPREDUCE-5411. Refresh size of loaded job cache on history server (Ashwin
     Shankar via jlowe)
 
+    MAPREDUCE-5332. Support token-preserving restart of history server (jlowe)
+
   IMPROVEMENTS
 
     MAPREDUCE-434. LocalJobRunner limited to single reducer (Sandy Ryza and

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java?rev=1527015&r1=1527014&r2=1527015&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JHAdminConfig.java Fri Sep 27 18:19:41 2013
@@ -164,6 +164,27 @@ public class JHAdminConfig {
   public static final String MR_HISTORY_STORAGE =
     MR_HISTORY_PREFIX + "store.class";
 
+  /**
+   * Enable the history server to store server state and recover server state
+   * upon startup.
+   */
+  public static final String MR_HS_RECOVERY_ENABLE =
+      MR_HISTORY_PREFIX + "recovery.enable";
+  public static final boolean DEFAULT_MR_HS_RECOVERY_ENABLE = false;
+
+  /**
+   * The HistoryServerStateStoreService class to store and recover server state
+   */
+  public static final String MR_HS_STATE_STORE =
+      MR_HISTORY_PREFIX + "recovery.store.class";
+
+  /**
+   * The URI where server state will be stored when
+   * HistoryServerFileSystemStateStoreService is configured as the state store
+   */
+  public static final String MR_HS_FS_STATE_STORE_URI =
+      MR_HISTORY_PREFIX + "recovery.store.fs.uri";
+
   /** Whether to use fixed ports with the minicluster. */
   public static final String MR_HISTORY_MINICLUSTER_FIXED_PORTS = MR_HISTORY_PREFIX
        + "minicluster.fixed.ports";

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1527015&r1=1527014&r2=1527015&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Fri Sep 27 18:19:41 2013
@@ -1181,4 +1181,28 @@
   <description>ACL of who can be admin of the History server.</description>
 </property>
 
+<property>
+  <name>mapreduce.jobhistory.recovery.enable</name>
+  <value>false</value>
+  <description>Enable the history server to store server state and recover
+  server state upon startup.  If enabled then
+  mapreduce.jobhistory.recovery.store.class must be specified.</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.recovery.store.class</name>
+  <value>org.apache.hadoop.mapreduce.v2.hs.HistoryServerFileSystemStateStoreService</value>
+  <description>The HistoryServerStateStoreService class to store history server
+  state for recovery.</description>
+</property>
+
+<property>
+  <name>mapreduce.jobhistory.recovery.store.fs.uri</name>
+  <value>${hadoop.tmp.dir}/mapred/history/recoverystore</value>
+  <!--value>hdfs://localhost:9000/mapred/history/recoverystore</value-->
+  <description>The URI where history server state will be stored if
+  HistoryServerFileSystemStateStoreService is configured as the recovery
+  storage class.</description>
+</property>
+
 </configuration>

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java?rev=1527015&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java Fri Sep 27 18:19:41 2013
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+@Private
+@Unstable
+/**
+ * A history server state storage implementation that supports any persistent
+ * storage that adheres to the FileSystem interface.
+ */
+public class HistoryServerFileSystemStateStoreService
+    extends HistoryServerStateStoreService {
+
+  public static final Log LOG =
+      LogFactory.getLog(HistoryServerFileSystemStateStoreService.class);
+
+  private static final String ROOT_STATE_DIR_NAME = "HistoryServerState";
+  private static final String TOKEN_STATE_DIR_NAME = "tokens";
+  private static final String TOKEN_KEYS_DIR_NAME = "keys";
+  private static final String TOKEN_BUCKET_DIR_PREFIX = "tb_";
+  private static final String TOKEN_BUCKET_NAME_FORMAT =
+      TOKEN_BUCKET_DIR_PREFIX + "%03d";
+  private static final String TOKEN_MASTER_KEY_FILE_PREFIX = "key_";
+  private static final String TOKEN_FILE_PREFIX = "token_";
+  private static final String TMP_FILE_PREFIX = "tmp-";
+  private static final FsPermission DIR_PERMISSIONS =
+      new FsPermission((short)0700);
+  private static final FsPermission FILE_PERMISSIONS =
+      new FsPermission((short)0400);
+  private static final int NUM_TOKEN_BUCKETS = 1000;
+
+  private FileSystem fs;
+  private Path rootStatePath;
+  private Path tokenStatePath;
+  private Path tokenKeysStatePath;
+
+  @Override
+  protected void initStorage(Configuration conf)
+      throws IOException {
+    final String storeUri = conf.get(JHAdminConfig.MR_HS_FS_STATE_STORE_URI);
+    if (storeUri == null) {
+      throw new IOException("No store location URI configured in " +
+          JHAdminConfig.MR_HS_FS_STATE_STORE_URI);
+    }
+
+    LOG.info("Using " + storeUri + " for history server state storage");
+    rootStatePath = new Path(storeUri, ROOT_STATE_DIR_NAME);
+  }
+
+  @Override
+  protected void startStorage() throws IOException {
+    fs = rootStatePath.getFileSystem(getConfig());
+    createDir(rootStatePath);
+    tokenStatePath = new Path(rootStatePath, TOKEN_STATE_DIR_NAME);
+    createDir(tokenStatePath);
+    tokenKeysStatePath = new Path(tokenStatePath, TOKEN_KEYS_DIR_NAME);
+    createDir(tokenKeysStatePath);
+    for (int i=0; i < NUM_TOKEN_BUCKETS; ++i) {
+      createDir(getTokenBucketPath(i));
+    }
+  }
+
+  @Override
+  protected void closeStorage() throws IOException {
+    // don't close the filesystem as it's part of the filesystem cache
+    // and other clients may still be using it
+  }
+
+  @Override
+  public HistoryServerState loadState() throws IOException {
+    LOG.info("Loading history server state from " + rootStatePath);
+    HistoryServerState state = new HistoryServerState();
+    loadTokenState(state);
+    return state;
+  }
+
+  @Override
+  public void storeToken(MRDelegationTokenIdentifier tokenId,
+      Long renewDate) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing token " + tokenId.getSequenceNumber());
+    }
+
+    Path tokenPath = getTokenPath(tokenId);
+    if (fs.exists(tokenPath)) {
+      throw new IOException(tokenPath + " already exists");
+    }
+
+    createFile(tokenPath, buildTokenData(tokenId, renewDate));
+  }
+
+  @Override
+  public void updateToken(MRDelegationTokenIdentifier tokenId,
+      Long renewDate) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Updating token " + tokenId.getSequenceNumber());
+    }
+    createFile(getTokenPath(tokenId), buildTokenData(tokenId, renewDate));
+  }
+
+  @Override
+  public void removeToken(MRDelegationTokenIdentifier tokenId)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing token " + tokenId.getSequenceNumber());
+    }
+    deleteFile(getTokenPath(tokenId));
+  }
+
+  @Override
+  public void storeTokenMasterKey(DelegationKey key) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing master key " + key.getKeyId());
+    }
+
+    Path keyPath = new Path(tokenKeysStatePath,
+        TOKEN_MASTER_KEY_FILE_PREFIX + key.getKeyId());
+    if (fs.exists(keyPath)) {
+      throw new IOException(keyPath + " already exists");
+    }
+
+    ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+    DataOutputStream dataStream = new DataOutputStream(memStream);
+    try {
+      key.write(dataStream);
+    } finally {
+      IOUtils.cleanup(LOG, dataStream);
+    }
+
+    createFile(keyPath, memStream.toByteArray());
+  }
+
+  @Override
+  public void removeTokenMasterKey(DelegationKey key)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing master key " + key.getKeyId());
+    }
+
+    Path keyPath = new Path(tokenKeysStatePath,
+        TOKEN_MASTER_KEY_FILE_PREFIX + key.getKeyId());
+    deleteFile(keyPath);
+  }
+
+  private static int getBucketId(MRDelegationTokenIdentifier tokenId) {
+    return tokenId.getSequenceNumber() % NUM_TOKEN_BUCKETS;
+  }
+
+  private Path getTokenBucketPath(int bucketId) {
+    return new Path(tokenStatePath,
+        String.format(TOKEN_BUCKET_NAME_FORMAT, bucketId));
+  }
+
+  private Path getTokenPath(MRDelegationTokenIdentifier tokenId) {
+    Path bucketPath = getTokenBucketPath(getBucketId(tokenId));
+    return new Path(bucketPath,
+        TOKEN_FILE_PREFIX + tokenId.getSequenceNumber());
+  }
+
+  private void createDir(Path dir) throws IOException {
+    try {
+      FileStatus status = fs.getFileStatus(dir);
+      if (!status.isDirectory()) {
+        throw new FileAlreadyExistsException("Unexpected file in store: "
+            + dir);
+      }
+      if (!status.getPermission().equals(DIR_PERMISSIONS)) {
+        fs.setPermission(dir, DIR_PERMISSIONS);
+      }
+    } catch (FileNotFoundException e) {
+      fs.mkdirs(dir, DIR_PERMISSIONS);
+    }
+  }
+
+  private void createFile(Path file, byte[] data) throws IOException {
+    final int WRITE_BUFFER_SIZE = 4096;
+    Path tmp = new Path(file.getParent(), TMP_FILE_PREFIX + file.getName());
+    FSDataOutputStream out = fs.create(tmp, FILE_PERMISSIONS, true,
+        WRITE_BUFFER_SIZE, fs.getDefaultReplication(tmp),
+        fs.getDefaultBlockSize(tmp), null);
+    try {
+      try {
+        out.write(data);
+      } finally {
+        IOUtils.cleanup(LOG, out);
+      }
+      if (!fs.rename(tmp, file)) {
+        throw new IOException("Could not rename " + tmp + " to " + file);
+      }
+    } catch (IOException e) {
+      fs.delete(tmp, false);
+      throw e;
+    }
+  }
+
+  private byte[] readFile(Path file, long numBytes) throws IOException {
+    byte[] data = new byte[(int)numBytes];
+    FSDataInputStream in = fs.open(file);
+    try {
+      in.readFully(data);
+    } finally {
+      IOUtils.cleanup(LOG, in);
+    }
+    return data;
+  }
+
+  private void deleteFile(Path file) throws IOException {
+    boolean deleted;
+    try {
+      deleted = fs.delete(file, false);
+    } catch (FileNotFoundException e) {
+      deleted = true;
+    }
+    if (!deleted) {
+      throw new IOException("Unable to delete " + file);
+    }
+  }
+
+  private byte[] buildTokenData(MRDelegationTokenIdentifier tokenId,
+      Long renewDate) throws IOException {
+    ByteArrayOutputStream memStream = new ByteArrayOutputStream();
+    DataOutputStream dataStream = new DataOutputStream(memStream);
+    try {
+      tokenId.write(dataStream);
+      dataStream.writeLong(renewDate);
+    } finally {
+      IOUtils.cleanup(LOG, dataStream);
+    }
+    return memStream.toByteArray();
+  }
+
+  private void loadTokenMasterKey(HistoryServerState state, Path keyFile,
+      long numKeyFileBytes) throws IOException {
+    DelegationKey key = new DelegationKey();
+    byte[] keyData = readFile(keyFile, numKeyFileBytes);
+    DataInputStream in =
+        new DataInputStream(new ByteArrayInputStream(keyData));
+    try {
+      key.readFields(in);
+    } finally {
+      IOUtils.cleanup(LOG, in);
+    }
+    state.tokenMasterKeyState.add(key);
+  }
+
+  private MRDelegationTokenIdentifier loadToken(HistoryServerState state,
+      Path tokenFile, long numTokenFileBytes) throws IOException {
+    MRDelegationTokenIdentifier tokenId = new MRDelegationTokenIdentifier();
+    long renewDate;
+    byte[] tokenData = readFile(tokenFile, numTokenFileBytes);
+    DataInputStream in =
+        new DataInputStream(new ByteArrayInputStream(tokenData));
+    try {
+      tokenId.readFields(in);
+      renewDate = in.readLong();
+    } finally {
+      IOUtils.cleanup(LOG, in);
+    }
+    state.tokenState.put(tokenId, renewDate);
+    return tokenId;
+  }
+
+  private int loadTokensFromBucket(HistoryServerState state, Path bucket)
+      throws IOException {
+    String numStr =
+        bucket.getName().substring(TOKEN_BUCKET_DIR_PREFIX.length());
+    final int bucketId = Integer.parseInt(numStr);
+    int numTokens = 0;
+    FileStatus[] tokenStats = fs.listStatus(bucket);
+    for (FileStatus stat : tokenStats) {
+      String name = stat.getPath().getName();
+      if (name.startsWith(TOKEN_FILE_PREFIX)) {
+        MRDelegationTokenIdentifier token =
+            loadToken(state, stat.getPath(), stat.getLen());
+        int tokenBucketId = getBucketId(token);
+        if (tokenBucketId != bucketId) {
+          throw new IOException("Token " + stat.getPath()
+              + " should be in bucket " + tokenBucketId + ", found in bucket "
+              + bucketId);
+        }
+        ++numTokens;
+      } else {
+        LOG.warn("Skipping unexpected file in history server token bucket: "
+            + stat.getPath());
+      }
+    }
+    return numTokens;
+  }
+
+  private int loadKeys(HistoryServerState state) throws IOException {
+    FileStatus[] stats = fs.listStatus(tokenKeysStatePath);
+    int numKeys = 0;
+    for (FileStatus stat : stats) {
+      String name = stat.getPath().getName();
+      if (name.startsWith(TOKEN_MASTER_KEY_FILE_PREFIX)) {
+        loadTokenMasterKey(state, stat.getPath(), stat.getLen());
+        ++numKeys;
+      } else {
+        LOG.warn("Skipping unexpected file in history server token state: "
+            + stat.getPath());
+      }
+    }
+    return numKeys;
+  }
+
+  private int loadTokens(HistoryServerState state) throws IOException {
+    FileStatus[] stats = fs.listStatus(tokenStatePath);
+    int numTokens = 0;
+    for (FileStatus stat : stats) {
+      String name = stat.getPath().getName();
+      if (name.startsWith(TOKEN_BUCKET_DIR_PREFIX)) {
+        numTokens += loadTokensFromBucket(state, stat.getPath());
+      } else if (name.equals(TOKEN_KEYS_DIR_NAME)) {
+        // key loading is done elsewhere
+        continue;
+      } else {
+        LOG.warn("Skipping unexpected file in history server token state: "
+            + stat.getPath());
+      }
+    }
+    return numTokens;
+  }
+
+  private void loadTokenState(HistoryServerState state) throws IOException {
+    int numKeys = loadKeys(state);
+    int numTokens = loadTokens(state);
+    LOG.info("Loaded " + numKeys + " master keys and " + numTokens
+        + " tokens from " + tokenStatePath);
+  }
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerFileSystemStateStoreService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java?rev=1527015&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java Fri Sep 27 18:19:41 2013
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+@Private
+@Unstable
+public class HistoryServerNullStateStoreService
+    extends HistoryServerStateStoreService {
+
+  @Override
+  protected void initStorage(Configuration conf) throws IOException {
+    // Do nothing
+  }
+
+  @Override
+  protected void startStorage() throws IOException {
+    // Do nothing
+  }
+
+  @Override
+  protected void closeStorage() throws IOException {
+    // Do nothing
+  }
+
+  @Override
+  public HistoryServerState loadState() throws IOException {
+    throw new UnsupportedOperationException(
+        "Cannot load state from null store");
+  }
+
+  @Override
+  public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
+      throws IOException {
+    // Do nothing
+  }
+
+  @Override
+  public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
+      throws IOException {
+    // Do nothing
+  }
+
+  @Override
+  public void removeToken(MRDelegationTokenIdentifier tokenId)
+      throws IOException {
+    // Do nothing
+  }
+
+  @Override
+  public void storeTokenMasterKey(DelegationKey key) throws IOException {
+    // Do nothing
+  }
+
+  @Override
+  public void removeTokenMasterKey(DelegationKey key) throws IOException {
+    // Do nothing
+  }
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerNullStateStoreService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java?rev=1527015&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java Fri Sep 27 18:19:41 2013
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.service.AbstractService;
+
+@Private
+@Unstable
+/**
+ * Base class for history server state storage.
+ * Storage implementations need to implement blocking store and load methods
+ * to actually store and load the state.
+ */
+public abstract class HistoryServerStateStoreService extends AbstractService {
+
+  public static class HistoryServerState {
+    Map<MRDelegationTokenIdentifier, Long> tokenState =
+        new HashMap<MRDelegationTokenIdentifier, Long>();
+    Set<DelegationKey> tokenMasterKeyState = new HashSet<DelegationKey>();
+
+    public Map<MRDelegationTokenIdentifier, Long> getTokenState() {
+      return tokenState;
+    }
+
+    public Set<DelegationKey> getTokenMasterKeyState() {
+      return tokenMasterKeyState;
+    }
+  }
+
+  public HistoryServerStateStoreService() {
+    super(HistoryServerStateStoreService.class.getName());
+  }
+
+  /**
+   * Initialize the state storage
+   *
+   * @param conf the configuration
+   * @throws IOException
+   */
+  @Override
+  public void serviceInit(Configuration conf) throws IOException {
+    initStorage(conf);
+  }
+
+  /**
+   * Start the state storage for use
+   *
+   * @throws IOException
+   */
+  @Override
+  public void serviceStart() throws IOException {
+    startStorage();
+  }
+
+  /**
+   * Shutdown the state storage.
+   * 
+   * @throws IOException
+   */
+  @Override
+  public void serviceStop() throws IOException {
+    closeStorage();
+  }
+
+  /**
+   * Implementation-specific initialization.
+   * 
+   * @param conf the configuration
+   * @throws IOException
+   */
+  protected abstract void initStorage(Configuration conf) throws IOException;
+
+  /**
+   * Implementation-specific startup.
+   * 
+   * @throws IOException
+   */
+  protected abstract void startStorage() throws IOException;
+
+  /**
+   * Implementation-specific shutdown.
+   * 
+   * @throws IOException
+   */
+  protected abstract void closeStorage() throws IOException;
+
+  /**
+   * Load the history server state from the state storage.
+   * 
+   * @throws IOException
+   */
+  public abstract HistoryServerState loadState() throws IOException;
+
+  /**
+   * Blocking method to store a delegation token along with the current token
+   * sequence number to the state storage.
+   * 
+   * Implementations must not return from this method until the token has been
+   * committed to the state store.
+   * 
+   * @param tokenId the token to store
+   * @param renewDate the token renewal deadline
+   * @throws IOException
+   */
+  public abstract void storeToken(MRDelegationTokenIdentifier tokenId,
+      Long renewDate) throws IOException;
+
+  /**
+   * Blocking method to update the expiration of a delegation token
+   * in the state storage.
+   * 
+   * Implementations must not return from this method until the expiration
+   * date of the token has been updated in the state store.
+   * 
+   * @param tokenId the token to update
+   * @param renewDate the new token renewal deadline
+   * @throws IOException
+   */
+  public abstract void updateToken(MRDelegationTokenIdentifier tokenId,
+      Long renewDate) throws IOException;
+
+  /**
+   * Blocking method to remove a delegation token from the state storage.
+   * 
+   * Implementations must not return from this method until the token has been
+   * removed from the state store.
+   * 
+   * @param tokenId the token to remove
+   * @throws IOException
+   */
+  public abstract void removeToken(MRDelegationTokenIdentifier tokenId)
+      throws IOException;
+
+  /**
+   * Blocking method to store a delegation token master key.
+   * 
+   * Implementations must not return from this method until the key has been
+   * committed to the state store.
+   * 
+   * @param key the master key to store
+   * @throws IOException
+   */
+  public abstract void storeTokenMasterKey(
+      DelegationKey key) throws IOException;
+
+  /**
+   * Blocking method to remove a delegation token master key.
+   * 
+   * Implementations must not return from this method until the key has been
+   * removed from the state store.
+   * 
+   * @param key the master key to remove
+   * @throws IOException
+   */
+  public abstract void removeTokenMasterKey(DelegationKey key)
+      throws IOException;
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java?rev=1527015&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java Fri Sep 27 18:19:41 2013
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class HistoryServerStateStoreServiceFactory {
+
+  /**
+   * Constructs an instance of the configured storage class
+   * 
+   * @param conf the configuration
+   * @return the state storage instance
+   */
+  public static HistoryServerStateStoreService getStore(Configuration conf) {
+    Class<? extends HistoryServerStateStoreService> storeClass =
+        HistoryServerNullStateStoreService.class;
+    boolean recoveryEnabled = conf.getBoolean(
+        JHAdminConfig.MR_HS_RECOVERY_ENABLE,
+        JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE);
+    if (recoveryEnabled) {
+      storeClass = conf.getClass(JHAdminConfig.MR_HS_STATE_STORE, null,
+          HistoryServerStateStoreService.class);
+      if (storeClass == null) {
+        throw new RuntimeException("Unable to locate storage class, check "
+            + JHAdminConfig.MR_HS_STATE_STORE);
+      }
+    }
+    return ReflectionUtils.newInstance(storeClass, conf);
+  }
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerStateStoreServiceFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java?rev=1527015&r1=1527014&r2=1527015&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JHSDelegationTokenSecretManager.java Fri Sep 27 18:19:41 2013
@@ -18,10 +18,17 @@
 
 package org.apache.hadoop.mapreduce.v2.hs;
 
+import java.io.IOException;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
 
 /**
  * A MapReduce specific delegation token secret manager.
@@ -33,6 +40,11 @@ import org.apache.hadoop.security.token.
 public class JHSDelegationTokenSecretManager
     extends AbstractDelegationTokenSecretManager<MRDelegationTokenIdentifier> {
 
+  private static final Log LOG = LogFactory.getLog(
+      JHSDelegationTokenSecretManager.class);
+
+  private HistoryServerStateStoreService store;
+
   /**
    * Create a secret manager
    * @param delegationKeyUpdateInterval the number of seconds for rolling new
@@ -42,17 +54,94 @@ public class JHSDelegationTokenSecretMan
    * @param delegationTokenRenewInterval how often the tokens must be renewed
    * @param delegationTokenRemoverScanInterval how often the tokens are scanned
    *        for expired tokens
+   * @param store history server state store for persisting state
    */
   public JHSDelegationTokenSecretManager(long delegationKeyUpdateInterval,
                                       long delegationTokenMaxLifetime, 
                                       long delegationTokenRenewInterval,
-                                      long delegationTokenRemoverScanInterval) {
+                                      long delegationTokenRemoverScanInterval,
+                                      HistoryServerStateStoreService store) {
     super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
           delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+    this.store = store;
   }
 
   @Override
   public MRDelegationTokenIdentifier createIdentifier() {
     return new MRDelegationTokenIdentifier();
   }
+
+  @Override
+  protected void storeNewMasterKey(DelegationKey key) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing master key " + key.getKeyId());
+    }
+    try {
+      store.storeTokenMasterKey(key);
+    } catch (IOException e) {
+      LOG.error("Unable to store master key " + key.getKeyId(), e);
+    }
+  }
+
+  @Override
+  protected void removeStoredMasterKey(DelegationKey key) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing master key " + key.getKeyId());
+    }
+    try {
+      store.removeTokenMasterKey(key);
+    } catch (IOException e) {
+      LOG.error("Unable to remove master key " + key.getKeyId(), e);
+    }
+  }
+
+  @Override
+  protected void storeNewToken(MRDelegationTokenIdentifier tokenId,
+      long renewDate) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing token " + tokenId.getSequenceNumber());
+    }
+    try {
+      store.storeToken(tokenId, renewDate);
+    } catch (IOException e) {
+      LOG.error("Unable to store token " + tokenId.getSequenceNumber(), e);
+    }
+  }
+
+  @Override
+  protected void removeStoredToken(MRDelegationTokenIdentifier tokenId)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing token " + tokenId.getSequenceNumber());
+    }
+    try {
+      store.removeToken(tokenId);
+    } catch (IOException e) {
+      LOG.error("Unable to remove token " + tokenId.getSequenceNumber(), e);
+    }
+  }
+
+  @Override
+  protected void updateStoredToken(MRDelegationTokenIdentifier tokenId,
+      long renewDate) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Updating token " + tokenId.getSequenceNumber());
+    }
+    try {
+      store.updateToken(tokenId, renewDate);
+    } catch (IOException e) {
+      LOG.error("Unable to update token " + tokenId.getSequenceNumber(), e);
+    }
+  }
+
+  public void recover(HistoryServerState state) throws IOException {
+    LOG.info("Recovering " + getClass().getSimpleName());
+    for (DelegationKey key : state.tokenMasterKeyState) {
+      addKey(key);
+    }
+    for (Entry<MRDelegationTokenIdentifier, Long> entry :
+        state.tokenState.entrySet()) {
+      addPersistedDelegationToken(entry.getKey(), entry.getValue());
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1527015&r1=1527014&r2=1527015&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java Fri Sep 27 18:19:41 2013
@@ -28,11 +28,13 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.v2.app.webapp.WebAppUtil;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
 import org.apache.hadoop.mapreduce.v2.hs.server.HSAdminServer;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ShutdownHookManager;
@@ -64,6 +66,46 @@ public class JobHistoryServer extends Co
   private JHSDelegationTokenSecretManager jhsDTSecretManager;
   private AggregatedLogDeletionService aggLogDelService;
   private HSAdminServer hsAdminServer;
+  private HistoryServerStateStoreService stateStore;
+
+  // utility class to start and stop secret manager as part of service
+  // framework and implement state recovery for secret manager on startup
+  private class HistoryServerSecretManagerService
+      extends AbstractService {
+
+    public HistoryServerSecretManagerService() {
+      super(HistoryServerSecretManagerService.class.getName());
+    }
+
+    @Override
+    protected void serviceStart() throws Exception {
+      boolean recoveryEnabled = getConfig().getBoolean(
+          JHAdminConfig.MR_HS_RECOVERY_ENABLE,
+          JHAdminConfig.DEFAULT_MR_HS_RECOVERY_ENABLE);
+      if (recoveryEnabled) {
+        assert stateStore.isInState(STATE.STARTED);
+        HistoryServerState state = stateStore.loadState();
+        jhsDTSecretManager.recover(state);
+      }
+
+      try {
+        jhsDTSecretManager.startThreads();
+      } catch(IOException io) {
+        LOG.error("Error while starting the Secret Manager threads", io);
+        throw io;
+      }
+
+      super.serviceStart();
+    }
+
+    @Override
+    protected void serviceStop() throws Exception {
+      if (jhsDTSecretManager != null) {
+        jhsDTSecretManager.stopThreads();
+      }
+      super.serviceStop();
+    }
+  }
 
   public JobHistoryServer() {
     super(JobHistoryServer.class.getName());
@@ -86,11 +128,14 @@ public class JobHistoryServer extends Co
     }
     jobHistoryService = new JobHistory();
     historyContext = (HistoryContext)jobHistoryService;
-    this.jhsDTSecretManager = createJHSSecretManager(conf);
+    stateStore = createStateStore(conf);
+    this.jhsDTSecretManager = createJHSSecretManager(conf, stateStore);
     clientService = new HistoryClientService(historyContext, 
         this.jhsDTSecretManager);
     aggLogDelService = new AggregatedLogDeletionService();
     hsAdminServer = new HSAdminServer(aggLogDelService, jobHistoryService);
+    addService(stateStore);
+    addService(new HistoryServerSecretManagerService());
     addService(jobHistoryService);
     addService(clientService);
     addService(aggLogDelService);
@@ -99,7 +144,7 @@ public class JobHistoryServer extends Co
   }
 
   protected JHSDelegationTokenSecretManager createJHSSecretManager(
-      Configuration conf) {
+      Configuration conf, HistoryServerStateStoreService store) {
     long secretKeyInterval = 
         conf.getLong(MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_KEY, 
                      MRConfig.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@@ -111,9 +156,14 @@ public class JobHistoryServer extends Co
                      MRConfig.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
       
     return new JHSDelegationTokenSecretManager(secretKeyInterval, 
-        tokenMaxLifetime, tokenRenewInterval, 3600000);
+        tokenMaxLifetime, tokenRenewInterval, 3600000, store);
   }
-  
+
+  protected HistoryServerStateStoreService createStateStore(
+      Configuration conf) {
+    return HistoryServerStateStoreServiceFactory.getStore(conf);
+  }
+
   protected void doSecureLogin(Configuration conf) throws IOException {
     SecurityUtil.login(conf, JHAdminConfig.MR_HISTORY_KEYTAB,
         JHAdminConfig.MR_HISTORY_PRINCIPAL);
@@ -123,20 +173,11 @@ public class JobHistoryServer extends Co
   protected void serviceStart() throws Exception {
     DefaultMetricsSystem.initialize("JobHistoryServer");
     JvmMetrics.initSingleton("JobHistoryServer", null);
-    try {
-      jhsDTSecretManager.startThreads();
-    } catch(IOException io) {
-      LOG.error("Error while starting the Secret Manager threads", io);
-      throw io;
-    }
     super.serviceStart();
   }
   
   @Override
   protected void serviceStop() throws Exception {
-    if (jhsDTSecretManager != null) {
-      jhsDTSecretManager.stopThreads();
-    }
     DefaultMetricsSystem.shutdown();
     super.serviceStop();
   }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java?rev=1527015&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java Fri Sep 27 18:19:41 2013
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+
+/**
+ * A state store backed by memory for unit tests
+ */
+class HistoryServerMemStateStoreService
+    extends HistoryServerStateStoreService {
+
+  HistoryServerState state;
+
+  @Override
+  protected void initStorage(Configuration conf) throws IOException {
+  }
+
+  @Override
+  protected void startStorage() throws IOException {
+    state = new HistoryServerState();
+  }
+
+  @Override
+  protected void closeStorage() throws IOException {
+    state = null;
+  }
+
+  @Override
+  public HistoryServerState loadState() throws IOException {
+    HistoryServerState result = new HistoryServerState();
+    result.tokenState.putAll(state.tokenState);
+    result.tokenMasterKeyState.addAll(state.tokenMasterKeyState);
+    return result;
+  }
+
+  @Override
+  public void storeToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
+      throws IOException {
+    if (state.tokenState.containsKey(tokenId)) {
+      throw new IOException("token " + tokenId + " was stored twice");
+    }
+    state.tokenState.put(tokenId, renewDate);
+  }
+
+  @Override
+  public void updateToken(MRDelegationTokenIdentifier tokenId, Long renewDate)
+      throws IOException {
+    if (!state.tokenState.containsKey(tokenId)) {
+      throw new IOException("token " + tokenId + " not in store");
+    }
+    state.tokenState.put(tokenId, renewDate);
+  }
+
+  @Override
+  public void removeToken(MRDelegationTokenIdentifier tokenId)
+      throws IOException {
+    state.tokenState.remove(tokenId);
+  }
+
+  @Override
+  public void storeTokenMasterKey(DelegationKey key) throws IOException {
+    if (state.tokenMasterKeyState.contains(key)) {
+      throw new IOException("token master key " + key + " was stored twice");
+    }
+    state.tokenMasterKeyState.add(key);
+  }
+
+  @Override
+  public void removeTokenMasterKey(DelegationKey key) throws IOException {
+    state.tokenMasterKeyState.remove(key);
+  }
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/HistoryServerMemStateStoreService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java?rev=1527015&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java Fri Sep 27 18:19:41 2013
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService.HistoryServerState;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHistoryServerFileSystemStateStoreService {
+
+  private static final File testDir = new File(
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")),
+      "TestHistoryServerFileSystemStateStoreService");
+
+  private Configuration conf;
+
+  @Before
+  public void setup() {
+    FileUtil.fullyDelete(testDir);
+    testDir.mkdirs();
+    conf = new Configuration();
+    conf.setBoolean(JHAdminConfig.MR_HS_RECOVERY_ENABLE, true);
+    conf.setClass(JHAdminConfig.MR_HS_STATE_STORE,
+        HistoryServerFileSystemStateStoreService.class,
+        HistoryServerStateStoreService.class);
+    conf.set(JHAdminConfig.MR_HS_FS_STATE_STORE_URI,
+        testDir.getAbsoluteFile().toURI().toString());
+  }
+
+  @After
+  public void cleanup() {
+    FileUtil.fullyDelete(testDir);
+  }
+
+  private HistoryServerStateStoreService createAndStartStore()
+      throws IOException {
+    HistoryServerStateStoreService store =
+        HistoryServerStateStoreServiceFactory.getStore(conf);
+    assertTrue("Factory did not create a filesystem store",
+        store instanceof HistoryServerFileSystemStateStoreService);
+    store.init(conf);
+    store.start();
+    return store;
+  }
+
+  @Test
+  public void testTokenStore() throws IOException {
+    HistoryServerStateStoreService store = createAndStartStore();
+
+    HistoryServerState state = store.loadState();
+    assertTrue("token state not empty", state.tokenState.isEmpty());
+    assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty());
+
+    final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes());
+    final MRDelegationTokenIdentifier token1 =
+        new MRDelegationTokenIdentifier(new Text("tokenOwner1"),
+            new Text("tokenRenewer1"), new Text("tokenUser1"));
+    token1.setSequenceNumber(1);
+    final Long tokenDate1 = 1L;
+    final MRDelegationTokenIdentifier token2 =
+        new MRDelegationTokenIdentifier(new Text("tokenOwner2"),
+            new Text("tokenRenewer2"), new Text("tokenUser2"));
+    token2.setSequenceNumber(12345678);
+    final Long tokenDate2 = 87654321L;
+
+    store.storeTokenMasterKey(key1);
+    try {
+      store.storeTokenMasterKey(key1);
+      fail("redundant store of key undetected");
+    } catch (IOException e) {
+      // expected
+    }
+    store.storeToken(token1, tokenDate1);
+    store.storeToken(token2, tokenDate2);
+    try {
+      store.storeToken(token1, tokenDate1);
+      fail("redundant store of token undetected");
+    } catch (IOException e) {
+      // expected
+    }
+    store.close();
+
+    store = createAndStartStore();
+    state = store.loadState();
+    assertEquals("incorrect loaded token count", 2, state.tokenState.size());
+    assertTrue("missing token 1", state.tokenState.containsKey(token1));
+    assertEquals("incorrect token 1 date", tokenDate1,
+        state.tokenState.get(token1));
+    assertTrue("missing token 2", state.tokenState.containsKey(token2));
+    assertEquals("incorrect token 2 date", tokenDate2,
+        state.tokenState.get(token2));
+    assertEquals("incorrect master key count", 1,
+        state.tokenMasterKeyState.size());
+    assertTrue("missing master key 1",
+        state.tokenMasterKeyState.contains(key1));
+
+    final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes());
+    final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes());
+    final MRDelegationTokenIdentifier token3 =
+        new MRDelegationTokenIdentifier(new Text("tokenOwner3"),
+            new Text("tokenRenewer3"), new Text("tokenUser3"));
+    token3.setSequenceNumber(12345679);
+    final Long tokenDate3 = 87654321L;
+
+    store.removeToken(token1);
+    store.storeTokenMasterKey(key2);
+    final Long newTokenDate2 = 975318642L;
+    store.updateToken(token2, newTokenDate2);
+    store.removeTokenMasterKey(key1);
+    store.storeTokenMasterKey(key3);
+    store.storeToken(token3, tokenDate3);
+    store.close();
+
+    store = createAndStartStore();
+    state = store.loadState();
+    assertEquals("incorrect loaded token count", 2, state.tokenState.size());
+    assertFalse("token 1 not removed", state.tokenState.containsKey(token1));
+    assertTrue("missing token 2", state.tokenState.containsKey(token2));
+    assertEquals("incorrect token 2 date", newTokenDate2,
+        state.tokenState.get(token2));
+    assertTrue("missing token 3", state.tokenState.containsKey(token3));
+    assertEquals("incorrect token 3 date", tokenDate3,
+        state.tokenState.get(token3));
+    assertEquals("incorrect master key count", 2,
+        state.tokenMasterKeyState.size());
+    assertFalse("master key 1 not removed",
+        state.tokenMasterKeyState.contains(key1));
+    assertTrue("missing master key 2",
+        state.tokenMasterKeyState.contains(key2));
+    assertTrue("missing master key 3",
+        state.tokenMasterKeyState.contains(key3));
+  }
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryServerFileSystemStateStoreService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java?rev=1527015&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java Fri Sep 27 18:19:41 2013
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.junit.Test;
+
+public class TestJHSDelegationTokenSecretManager {
+
+  @Test
+  public void testRecovery() throws IOException {
+    Configuration conf = new Configuration();
+    HistoryServerStateStoreService store =
+        new HistoryServerMemStateStoreService();
+    store.init(conf);
+    store.start();
+    JHSDelegationTokenSecretManagerForTest mgr =
+        new JHSDelegationTokenSecretManagerForTest(store);
+    mgr.startThreads();
+
+    MRDelegationTokenIdentifier tokenId1 = new MRDelegationTokenIdentifier(
+        new Text("tokenOwner"), new Text("tokenRenewer"),
+        new Text("tokenUser"));
+    Token<MRDelegationTokenIdentifier> token1 =
+        new Token<MRDelegationTokenIdentifier>(tokenId1, mgr);
+
+    MRDelegationTokenIdentifier tokenId2 = new MRDelegationTokenIdentifier(
+        new Text("tokenOwner"), new Text("tokenRenewer"),
+        new Text("tokenUser"));
+    Token<MRDelegationTokenIdentifier> token2 =
+        new Token<MRDelegationTokenIdentifier>(tokenId2, mgr);
+    DelegationKey[] keys = mgr.getAllKeys();
+    long tokenRenewDate1 = mgr.getAllTokens().get(tokenId1).getRenewDate();
+    long tokenRenewDate2 = mgr.getAllTokens().get(tokenId2).getRenewDate();
+    mgr.stopThreads();
+
+    mgr = new JHSDelegationTokenSecretManagerForTest(store);
+    mgr.recover(store.loadState());
+    List<DelegationKey> recoveredKeys = Arrays.asList(mgr.getAllKeys());
+    for (DelegationKey key : keys) {
+      assertTrue("key missing after recovery", recoveredKeys.contains(key));
+    }
+    assertTrue("token1 missing", mgr.getAllTokens().containsKey(tokenId1));
+    assertEquals("token1 renew date", tokenRenewDate1,
+        mgr.getAllTokens().get(tokenId1).getRenewDate());
+    assertTrue("token2 missing", mgr.getAllTokens().containsKey(tokenId2));
+    assertEquals("token2 renew date", tokenRenewDate2,
+        mgr.getAllTokens().get(tokenId2).getRenewDate());
+
+    mgr.startThreads();
+    mgr.verifyToken(tokenId1, token1.getPassword());
+    mgr.verifyToken(tokenId2, token2.getPassword());
+    MRDelegationTokenIdentifier tokenId3 = new MRDelegationTokenIdentifier(
+        new Text("tokenOwner"), new Text("tokenRenewer"),
+        new Text("tokenUser"));
+    Token<MRDelegationTokenIdentifier> token3 =
+        new Token<MRDelegationTokenIdentifier>(tokenId3, mgr);
+    assertEquals("sequence number restore", tokenId2.getSequenceNumber() + 1,
+        tokenId3.getSequenceNumber());
+    mgr.cancelToken(token1, "tokenOwner");
+    long tokenRenewDate3 = mgr.getAllTokens().get(tokenId3).getRenewDate();
+    mgr.stopThreads();
+
+    mgr = new JHSDelegationTokenSecretManagerForTest(store);
+    mgr.recover(store.loadState());
+    assertFalse("token1 should be missing",
+        mgr.getAllTokens().containsKey(tokenId1));
+    assertTrue("token2 missing", mgr.getAllTokens().containsKey(tokenId2));
+    assertEquals("token2 renew date", tokenRenewDate2,
+        mgr.getAllTokens().get(tokenId2).getRenewDate());
+    assertTrue("token3 missing", mgr.getAllTokens().containsKey(tokenId3));
+    assertEquals("token3 renew date", tokenRenewDate3,
+        mgr.getAllTokens().get(tokenId3).getRenewDate());
+
+    mgr.startThreads();
+    mgr.verifyToken(tokenId2, token2.getPassword());
+    mgr.verifyToken(tokenId3, token3.getPassword());
+    mgr.stopThreads();
+ }
+
+  private static class JHSDelegationTokenSecretManagerForTest
+      extends JHSDelegationTokenSecretManager {
+
+    public JHSDelegationTokenSecretManagerForTest(
+        HistoryServerStateStoreService store) {
+      super(10000, 10000, 10000, 10000, store);
+    }
+
+    public Map<MRDelegationTokenIdentifier, DelegationTokenInformation> getAllTokens() {
+      return new HashMap<MRDelegationTokenIdentifier, DelegationTokenInformation>(currentTokens);
+    }
+  }
+}

Propchange: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJHSDelegationTokenSecretManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java?rev=1527015&r1=1527014&r2=1527015&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryServer.java Fri Sep 27 18:19:41 2013
@@ -72,7 +72,7 @@ public class TestJobHistoryServer {
     Configuration config = new Configuration();
     historyServer.init(config);
     assertEquals(STATE.INITED, historyServer.getServiceState());
-    assertEquals(4, historyServer.getServices().size());
+    assertEquals(6, historyServer.getServices().size());
     HistoryClientService historyService = historyServer.getClientService();
     assertNotNull(historyServer.getClientService());
     assertEquals(STATE.INITED, historyService.getServiceState());

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java?rev=1527015&r1=1527014&r2=1527015&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java Fri Sep 27 18:19:41 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.hs.HistoryServerStateStoreService;
 import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
@@ -87,10 +88,11 @@ public class TestJHSSecurity {
           // no keytab based login
         };
 
+        @Override
         protected JHSDelegationTokenSecretManager createJHSSecretManager(
-            Configuration conf) {
+            Configuration conf, HistoryServerStateStoreService store) {
           return new JHSDelegationTokenSecretManager(initialInterval, 
-              maxLifetime, renewInterval, 3600000);
+              maxLifetime, renewInterval, 3600000, store);
         }
       };
 //      final JobHistoryServer jobHistoryServer = jhServer;