You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2023/02/22 22:02:45 UTC

[hadoop] branch branch-3.3 updated: HADOOP-18535. Implement token storage solution based on MySQL

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new eab72153546 HADOOP-18535. Implement token storage solution based on MySQL
eab72153546 is described below

commit eab721535466b887ca37fd9fdf71393af45cbbe7
Author: hchaverr <hc...@linkedin.com>
AuthorDate: Tue Jan 24 10:43:36 2023 -0800

    HADOOP-18535. Implement token storage solution based on MySQL
    
    Fixes #1240
    
    Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
 .../AbstractDelegationTokenSecretManager.java      |  36 +-
 .../SQLDelegationTokenSecretManager.java           | 400 +++++++++++++++++
 hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml        |  22 +
 .../TokenStore/MySQL/TokenStoreDatabase.sql        |  21 +
 .../scripts/TokenStore/MySQL/TokenStoreTables.sql  |  52 +++
 .../scripts/TokenStore/MySQL/TokenStoreUser.sql    |  26 ++
 .../hadoop-hdfs-rbf/scripts/TokenStore/README      |  24 ++
 .../security/token/DistributedSQLCounter.java      | 138 ++++++
 .../token/HikariDataSourceConnectionFactory.java   |  68 +++
 .../security/token/SQLConnectionFactory.java       |  54 +++
 .../token/SQLDelegationTokenSecretManagerImpl.java | 242 +++++++++++
 .../token/SQLSecretManagerRetriableHandler.java    | 133 ++++++
 .../TestSQLDelegationTokenSecretManagerImpl.java   | 471 +++++++++++++++++++++
 hadoop-project/pom.xml                             |  12 +
 14 files changed, 1697 insertions(+), 2 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index d5f0e6007c2..61c3312c107 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -19,7 +19,9 @@
 package org.apache.hadoop.security.token.delegation;
 
 import java.io.ByteArrayInputStream;
+import java.io.DataInput;
 import java.io.DataInputStream;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.Collection;
@@ -38,6 +40,8 @@ import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
 import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -428,8 +432,9 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent
   /** 
    * Update the current master key for generating delegation tokens 
    * It should be called only by tokenRemoverThread.
+   * @throws IOException raised on errors performing I/O.
    */
-  void rollMasterKey() throws IOException {
+  protected void rollMasterKey() throws IOException {
     synchronized (this) {
       removeExpiredKeys();
       /* set final expiry date for retiring currentKey */
@@ -664,11 +669,15 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent
 
   /** Class to encapsulate a token's renew date and password. */
   @InterfaceStability.Evolving
-  public static class DelegationTokenInformation {
+  public static class DelegationTokenInformation implements Writable {
     long renewDate;
     byte[] password;
     String trackingId;
 
+    public DelegationTokenInformation() {
+      this(0, null);
+    }
+
     public DelegationTokenInformation(long renewDate, byte[] password) {
       this(renewDate, password, null);
     }
@@ -698,6 +707,29 @@ public abstract class AbstractDelegationTokenSecretManager<TokenIdent
     public String getTrackingId() {
       return trackingId;
     }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      WritableUtils.writeVLong(out, renewDate);
+      if (password == null) {
+        WritableUtils.writeVInt(out, -1);
+      } else {
+        WritableUtils.writeVInt(out, password.length);
+        out.write(password);
+      }
+      WritableUtils.writeString(out, trackingId);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      renewDate = WritableUtils.readVLong(in);
+      int len = WritableUtils.readVInt(in);
+      if (len > -1) {
+        password = new byte[len];
+        in.readFully(password);
+      }
+      trackingId = WritableUtils.readString(in);
+    }
   }
   
   /** Remove expired delegation tokens from cache */
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
new file mode 100644
index 00000000000..4b6ae21d7a9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of {@link AbstractDelegationTokenSecretManager} that
+ * persists TokenIdentifiers and DelegationKeys in an existing SQL database.
+ */
+public abstract class SQLDelegationTokenSecretManager<TokenIdent
+    extends AbstractDelegationTokenIdentifier>
+    extends AbstractDelegationTokenSecretManager<TokenIdent> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SQLDelegationTokenSecretManager.class);
+
+  public static final String SQL_DTSM_CONF_PREFIX = "sql-dt-secret-manager.";
+  private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX
+      + "token.seqnum.batch.size";
+  public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10;
+
+  // Batch of sequence numbers that will be requested by the sequenceNumCounter.
+  // A new batch is requested once the sequenceNums available to a secret manager are
+  // exhausted, including during initialization.
+  private final int seqNumBatchSize;
+
+  // Last sequenceNum in the current batch that has been allocated to a token.
+  private int currentSeqNum;
+
+  // Max sequenceNum in the current batch that can be allocated to a token.
+  // Unused sequenceNums in the current batch cannot be reused by other routers.
+  private int currentMaxSeqNum;
+
+  public SQLDelegationTokenSecretManager(Configuration conf) {
+    super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
+            DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
+        conf.getLong(DelegationTokenManager.MAX_LIFETIME,
+            DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000,
+        conf.getLong(DelegationTokenManager.RENEW_INTERVAL,
+            DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000,
+        conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
+            DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
+
+    this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
+        DEFAULT_SEQ_NUM_BATCH_SIZE);
+  }
+
+  /**
+   * Persists a TokenIdentifier and its corresponding TokenInformation into
+   * the SQL database. The TokenIdentifier is expected to be unique and any
+   * duplicate token attempts will result in an IOException.
+   * @param ident TokenIdentifier to persist.
+   * @param tokenInfo DelegationTokenInformation associated with the TokenIdentifier.
+   */
+  @Override
+  protected void storeToken(TokenIdent ident,
+      DelegationTokenInformation tokenInfo) throws IOException {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos)) {
+      tokenInfo.write(dos);
+      // Add token to SQL database
+      insertToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray());
+      // Add token to local cache
+      super.storeToken(ident, tokenInfo);
+    } catch (SQLException e) {
+      throw new IOException("Failed to store token in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Updates the TokenInformation of an existing TokenIdentifier in
+   * the SQL database.
+   * @param ident Existing TokenIdentifier in the SQL database.
+   * @param tokenInfo Updated DelegationTokenInformation associated with the TokenIdentifier.
+   */
+  @Override
+  protected void updateToken(TokenIdent ident,
+      DelegationTokenInformation tokenInfo) throws IOException {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+      try (DataOutputStream dos = new DataOutputStream(bos)) {
+        tokenInfo.write(dos);
+        // Update token in SQL database
+        updateToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray());
+        // Update token in local cache
+        super.updateToken(ident, tokenInfo);
+      }
+    } catch (SQLException e) {
+      throw new IOException("Failed to update token in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Cancels a token by removing it from the SQL database. This will
+   * call the corresponding method in {@link AbstractDelegationTokenSecretManager}
+   * to perform validation and remove the token from the cache.
+   * @return Identifier of the canceled token
+   */
+  @Override
+  public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
+      String canceller) throws IOException {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(token.getIdentifier());
+        DataInputStream din = new DataInputStream(bis)) {
+      TokenIdent id = createIdentifier();
+      id.readFields(din);
+
+      // Calling getTokenInfo to load token into local cache if not present.
+      // super.cancelToken() requires token to be present in local cache.
+      getTokenInfo(id);
+    }
+
+    return super.cancelToken(token, canceller);
+  }
+
+  /**
+   * Removes the existing TokenInformation from the SQL database to
+   * invalidate it.
+   * @param ident TokenInformation to remove from the SQL database.
+   */
+  @Override
+  protected void removeStoredToken(TokenIdent ident) throws IOException {
+    try {
+      deleteToken(ident.getSequenceNumber(), ident.getBytes());
+    } catch (SQLException e) {
+      LOG.warn("Failed to remove token in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Obtains the DelegationTokenInformation associated with the given
+   * TokenIdentifier in the SQL database.
+   * @param ident Existing TokenIdentifier in the SQL database.
+   * @return DelegationTokenInformation that matches the given TokenIdentifier or
+   *         null if it doesn't exist in the database.
+   */
+  @Override
+  protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
+    // Look for token in local cache
+    DelegationTokenInformation tokenInfo = super.getTokenInfo(ident);
+
+    if (tokenInfo == null) {
+      try {
+        // Look for token in SQL database
+        byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes());
+
+        if (tokenInfoBytes != null) {
+          tokenInfo = new DelegationTokenInformation();
+          try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) {
+            try (DataInputStream dis = new DataInputStream(bis)) {
+              tokenInfo.readFields(dis);
+            }
+          }
+
+          // Update token in local cache
+          currentTokens.put(ident, tokenInfo);
+        }
+      } catch (IOException | SQLException e) {
+        LOG.error("Failed to get token in SQL secret manager", e);
+      }
+    }
+
+    return tokenInfo;
+  }
+
+  /**
+   * Obtains the value of the last reserved sequence number.
+   * @return Last reserved sequence number.
+   */
+  @Override
+  public int getDelegationTokenSeqNum() {
+    try {
+      return selectSequenceNum();
+    } catch (SQLException e) {
+      throw new RuntimeException(
+          "Failed to get token sequence number in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Updates the value of the last reserved sequence number.
+   * @param seqNum Value to update the sequence number to.
+   */
+  @Override
+  public void setDelegationTokenSeqNum(int seqNum) {
+    try {
+      updateSequenceNum(seqNum);
+    } catch (SQLException e) {
+      throw new RuntimeException(
+          "Failed to update token sequence number in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Obtains the next available sequence number that can be allocated to a Token.
+   * Sequence numbers need to be reserved using the shared sequenceNumberCounter once
+   * the local batch has been exhausted, which handles sequenceNumber allocation
+   * concurrently with other secret managers.
+   * This method ensures that sequence numbers are incremental in a single secret manager,
+   * but not across secret managers.
+   * @return Next available sequence number.
+   */
+  @Override
+  public synchronized int incrementDelegationTokenSeqNum() {
+    if (currentSeqNum >= currentMaxSeqNum) {
+      try {
+        // Request a new batch of sequence numbers and use the
+        // lowest one available.
+        currentSeqNum = incrementSequenceNum(seqNumBatchSize);
+        currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
+      } catch (SQLException e) {
+        throw new RuntimeException(
+            "Failed to increment token sequence number in SQL secret manager", e);
+      }
+    }
+
+    return ++currentSeqNum;
+  }
+
+  /**
+   * Persists a DelegationKey into the SQL database. The delegation keyId
+   * is expected to be unique and any duplicate key attempts will result
+   * in an IOException.
+   * @param key DelegationKey to persist into the SQL database.
+   */
+  @Override
+  protected void storeDelegationKey(DelegationKey key) throws IOException {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos)) {
+      key.write(dos);
+      // Add delegation key to SQL database
+      insertDelegationKey(key.getKeyId(), bos.toByteArray());
+      // Add delegation key to local cache
+      super.storeDelegationKey(key);
+    } catch (SQLException e) {
+      throw new IOException("Failed to store delegation key in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Updates an existing DelegationKey in the SQL database.
+   * @param key Updated DelegationKey.
+   */
+  @Override
+  protected void updateDelegationKey(DelegationKey key) throws IOException {
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos)) {
+      key.write(dos);
+      // Update delegation key in SQL database
+      updateDelegationKey(key.getKeyId(), bos.toByteArray());
+      // Update delegation key in local cache
+      super.updateDelegationKey(key);
+    } catch (SQLException e) {
+      throw new IOException("Failed to update delegation key in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Removes the existing DelegationKey from the SQL database to
+   * invalidate it.
+   * @param key DelegationKey to remove from the SQL database.
+   */
+  @Override
+  protected void removeStoredMasterKey(DelegationKey key) {
+    try {
+      deleteDelegationKey(key.getKeyId());
+    } catch (SQLException e) {
+      LOG.warn("Failed to remove delegation key in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Obtains the DelegationKey from the SQL database.
+   * @param keyId KeyId of the DelegationKey to obtain.
+   * @return DelegationKey that matches the given keyId or null
+   *         if it doesn't exist in the database.
+   */
+  @Override
+  protected DelegationKey getDelegationKey(int keyId) {
+    // Look for delegation key in local cache
+    DelegationKey delegationKey = super.getDelegationKey(keyId);
+
+    if (delegationKey == null) {
+      try {
+        // Look for delegation key in SQL database
+        byte[] delegationKeyBytes = selectDelegationKey(keyId);
+
+        if (delegationKeyBytes != null) {
+          delegationKey = new DelegationKey();
+          try (ByteArrayInputStream bis = new ByteArrayInputStream(delegationKeyBytes)) {
+            try (DataInputStream dis = new DataInputStream(bis)) {
+              delegationKey.readFields(dis);
+            }
+          }
+
+          // Update delegation key in local cache
+          allKeys.put(keyId, delegationKey);
+        }
+      } catch (IOException | SQLException e) {
+        LOG.error("Failed to get delegation key in SQL secret manager", e);
+      }
+    }
+
+    return delegationKey;
+  }
+
+  /**
+   * Obtains the value of the last delegation key id.
+   * @return Last delegation key id.
+   */
+  @Override
+  public int getCurrentKeyId() {
+    try {
+      return selectKeyId();
+    } catch (SQLException e) {
+      throw new RuntimeException(
+          "Failed to get delegation key id in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Updates the value of the last delegation key id.
+   * @param keyId Value to update the delegation key id to.
+   */
+  @Override
+  public void setCurrentKeyId(int keyId) {
+    try {
+      updateKeyId(keyId);
+    } catch (SQLException e) {
+      throw new RuntimeException(
+          "Failed to set delegation key id in SQL secret manager", e);
+    }
+  }
+
+  /**
+   * Obtains the next available delegation key id that can be allocated to a DelegationKey.
+   * Delegation key id need to be reserved using the shared delegationKeyIdCounter,
+   * which handles keyId allocation concurrently with other secret managers.
+   * @return Next available delegation key id.
+   */
+  @Override
+  public int incrementCurrentKeyId() {
+    try {
+      return incrementKeyId(1) + 1;
+    } catch (SQLException e) {
+      throw new RuntimeException(
+          "Failed to increment delegation key id in SQL secret manager", e);
+    }
+  }
+
+  // Token operations in SQL database
+  protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier)
+      throws SQLException;
+  protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
+      throws SQLException;
+  protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
+      throws SQLException;
+  protected abstract void deleteToken(int sequenceNum, byte[] tokenIdentifier)
+      throws SQLException;
+  // Delegation key operations in SQL database
+  protected abstract byte[] selectDelegationKey(int keyId) throws SQLException;
+  protected abstract void insertDelegationKey(int keyId, byte[] delegationKey)
+      throws SQLException;
+  protected abstract void updateDelegationKey(int keyId, byte[] delegationKey)
+      throws SQLException;
+  protected abstract void deleteDelegationKey(int keyId) throws SQLException;
+  // Counter operations in SQL database
+  protected abstract int selectSequenceNum() throws SQLException;
+  protected abstract void updateSequenceNum(int value) throws SQLException;
+  protected abstract int incrementSequenceNum(int amount) throws SQLException;
+  protected abstract int selectKeyId() throws SQLException;
+  protected abstract void updateKeyId(int value) throws SQLException;
+  protected abstract int incrementKeyId(int amount) throws SQLException;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
index 349803fadbd..a0ddec5b8c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
@@ -93,6 +93,15 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.zaxxer</groupId>
+      <artifactId>HikariCP-java7</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
@@ -109,6 +118,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>curator-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.derby</groupId>
+      <artifactId>derby</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
@@ -126,6 +140,14 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <property>
+              <name>derby.stream.error.file</name>
+              <value>${project.build.directory}/derby.log</value>
+            </property>
+          </systemProperties>
+        </configuration>
       </plugin>
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql
new file mode 100644
index 00000000000..07fea4c24bc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Script to create a new Database in MySQL for the TokenStore
+
+CREATE DATABASE IF NOT EXISTS TokenStore;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql
new file mode 100644
index 00000000000..d377c4e15f2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Script to generate all the tables for the TokenStore in MySQL
+
+USE TokenStore
+
+CREATE TABLE IF NOT EXISTS Tokens(
+    sequenceNum int NOT NULL,
+    tokenIdentifier varbinary(255) NOT NULL,
+    tokenInfo varbinary(255) NOT NULL,
+    modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY(sequenceNum, tokenIdentifier)
+);
+
+CREATE TABLE IF NOT EXISTS DelegationKeys(
+    keyId int NOT NULL,
+    delegationKey varbinary(255) NOT NULL,
+    modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+    PRIMARY KEY(keyId)
+);
+
+CREATE TABLE IF NOT EXISTS LastSequenceNum(
+    sequenceNum int NOT NULL
+);
+
+-- Initialize the LastSequenceNum table with a single entry
+INSERT INTO LastSequenceNum (sequenceNum)
+SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastSequenceNum);
+
+CREATE TABLE IF NOT EXISTS LastDelegationKeyId(
+    keyId int NOT NULL
+);
+
+-- Initialize the LastDelegationKeyId table with a single entry
+INSERT INTO LastDelegationKeyId (keyId)
+SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastDelegationKeyId);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql
new file mode 100644
index 00000000000..844d7a2f944
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Script to create a new User in MySQL for the TokenStore
+
+-- Update TokenStore user and password on this script
+CREATE USER IF NOT EXISTS 'TokenStoreUser'@'%' IDENTIFIED BY 'TokenStorePassword';
+
+GRANT ALL PRIVILEGES ON TokenStore.* TO 'TokenStoreUser'@'%';
+
+FLUSH PRIVILEGES;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README
new file mode 100644
index 00000000000..72425315319
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+These scripts must be executed to create the TokenStore database, tables and users needed to use the
+SQLDelegationTokenSecretManagerImpl as the delegation token secret manager:
+1. TokenStoreDatabase.sql
+2. TokenStoreTables.sql
+3. TokenStoreUser.sql
+
+Note: The TokenStoreUser.sql defines a default user/password. You are highly encouraged to set
+this to a proper strong password.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java
new file mode 100644
index 00000000000..14b232783f5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed counter that relies on a SQL database to synchronize
+ * between multiple clients. This expects a table with a single int field
+ * to exist in the database. One record must exist on the table at all times,
+ * representing the last used value reserved by a client.
+ */
+public class DistributedSQLCounter {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DistributedSQLCounter.class);
+
+  private final String field;
+  private final String table;
+  private final SQLConnectionFactory connectionFactory;
+
+  public DistributedSQLCounter(String field, String table,
+      SQLConnectionFactory connectionFactory) {
+    this.field = field;
+    this.table = table;
+    this.connectionFactory = connectionFactory;
+  }
+
+  /**
+   * Obtains the value of the counter.
+   * @return counter value.
+   */
+  public int selectCounterValue() throws SQLException {
+    try (Connection connection = connectionFactory.getConnection()) {
+      return selectCounterValue(false, connection);
+    }
+  }
+
+  private int selectCounterValue(boolean forUpdate, Connection connection) throws SQLException {
+    String query = String.format("SELECT %s FROM %s %s", field, table,
+        forUpdate ? "FOR UPDATE" : "");
+    LOG.debug("Select counter statement: " + query);
+    try (Statement statement = connection.createStatement();
+        ResultSet result = statement.executeQuery(query)) {
+      if (result.next()) {
+        return result.getInt(field);
+      } else {
+        throw new IllegalStateException("Counter table not initialized: " + table);
+      }
+    }
+  }
+
+  /**
+   * Sets the counter to the given value.
+   * @param value Value to assign to counter.
+   */
+  public void updateCounterValue(int value) throws SQLException {
+    try (Connection connection = connectionFactory.getConnection(true)) {
+      updateCounterValue(value, connection);
+    }
+  }
+
+  /**
+   * Sets the counter to the given value.
+   * @param connection Connection to database hosting the counter table.
+   * @param value Value to assign to counter.
+   */
+  public void updateCounterValue(int value, Connection connection) throws SQLException {
+    String queryText = String.format("UPDATE %s SET %s = ?", table, field);
+    LOG.debug("Update counter statement: " + queryText + ". Value: " + value);
+    try (PreparedStatement statement = connection.prepareStatement(queryText)) {
+      statement.setInt(1, value);
+      statement.execute();
+    }
+  }
+
+  /**
+   * Increments the counter by the given amount and
+   * returns the previous counter value.
+   * @param amount Amount to increase the counter.
+   * @return Previous counter value.
+   */
+  public int incrementCounterValue(int amount) throws SQLException {
+    // Disabling auto-commit to ensure that all statements on this transaction
+    // are committed at once.
+    try (Connection connection = connectionFactory.getConnection(false)) {
+      // Preventing dirty reads and non-repeatable reads to ensure that the
+      // value read will not be updated by a different connection.
+      if (connection.getTransactionIsolation() < Connection.TRANSACTION_REPEATABLE_READ) {
+        connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+      }
+
+      try {
+        // Reading the counter value "FOR UPDATE" to lock the value record,
+        // forcing other connections to wait until this transaction is committed.
+        int lastValue = selectCounterValue(true, connection);
+
+        // Calculate the new counter value and handling overflow by
+        // resetting the counter to 0.
+        int newValue = lastValue + amount;
+        if (newValue < 0) {
+          lastValue = 0;
+          newValue = amount;
+        }
+
+        updateCounterValue(newValue, connection);
+        connection.commit();
+        return lastValue;
+      } catch (Exception e) {
+        // Rollback transaction to release table locks
+        connection.rollback();
+        throw e;
+      }
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java
new file mode 100644
index 00000000000..5510e9f54b9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Class that relies on a HikariDataSource to provide SQL connections.
+ */
+class HikariDataSourceConnectionFactory implements SQLConnectionFactory {
+  protected final static String HIKARI_PROPS = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+      + "connection.hikari.";
+  private final HikariDataSource dataSource;
+
+  HikariDataSourceConnectionFactory(Configuration conf) {
+    Properties properties = new Properties();
+    properties.setProperty("jdbcUrl", conf.get(CONNECTION_URL));
+    properties.setProperty("username", conf.get(CONNECTION_USERNAME));
+    properties.setProperty("password", conf.get(CONNECTION_PASSWORD));
+    properties.setProperty("driverClassName", conf.get(CONNECTION_DRIVER));
+
+    // Include hikari connection properties
+    properties.putAll(conf.getPropsWithPrefix(HIKARI_PROPS));
+
+    HikariConfig hikariConfig = new HikariConfig(properties);
+    this.dataSource = new HikariDataSource(hikariConfig);
+  }
+
+  @Override
+  public Connection getConnection() throws SQLException {
+    return dataSource.getConnection();
+  }
+
+  @Override
+  public void shutdown() {
+    // Close database connections
+    dataSource.close();
+  }
+
+  @VisibleForTesting
+  HikariDataSource getDataSource() {
+    return dataSource;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java
new file mode 100644
index 00000000000..a464cc81968
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import com.mysql.cj.jdbc.MysqlDataSource;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
+
+
+/**
+ * Interface to provide SQL connections to the {@link SQLDelegationTokenSecretManagerImpl}.
+ */
+public interface SQLConnectionFactory {
+  String CONNECTION_URL = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+      + "connection.url";
+  String CONNECTION_USERNAME = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+      + "connection.username";
+  String CONNECTION_PASSWORD = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+      + "connection.password";
+  String CONNECTION_DRIVER = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+      + "connection.driver";
+
+  Connection getConnection() throws SQLException;
+  void shutdown();
+
+  default Connection getConnection(boolean autocommit) throws SQLException {
+    Connection connection = getConnection();
+    connection.setAutoCommit(autocommit);
+    return connection;
+  }
+}
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java
new file mode 100644
index 00000000000..7da54778f31
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of {@link SQLDelegationTokenSecretManager} that
+ * persists TokenIdentifiers and DelegationKeys in a SQL database.
+ * This implementation relies on the Datanucleus JDO PersistenceManager, which
+ * can be configured with datanucleus.* configuration properties.
+ */
+public class SQLDelegationTokenSecretManagerImpl
+    extends SQLDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SQLDelegationTokenSecretManagerImpl.class);
+  private static final String SEQ_NUM_COUNTER_FIELD = "sequenceNum";
+  private static final String SEQ_NUM_COUNTER_TABLE = "LastSequenceNum";
+  private static final String KEY_ID_COUNTER_FIELD = "keyId";
+  private static final String KEY_ID_COUNTER_TABLE = "LastDelegationKeyId";
+
+  private final SQLConnectionFactory connectionFactory;
+  private final DistributedSQLCounter sequenceNumCounter;
+  private final DistributedSQLCounter delegationKeyIdCounter;
+  private final SQLSecretManagerRetriableHandler retryHandler;
+
+  public SQLDelegationTokenSecretManagerImpl(Configuration conf) {
+    this(conf, new HikariDataSourceConnectionFactory(conf),
+        SQLSecretManagerRetriableHandlerImpl.getInstance(conf));
+  }
+
+  public SQLDelegationTokenSecretManagerImpl(Configuration conf,
+      SQLConnectionFactory connectionFactory, SQLSecretManagerRetriableHandler retryHandler) {
+    super(conf);
+
+    this.connectionFactory = connectionFactory;
+    this.sequenceNumCounter = new DistributedSQLCounter(SEQ_NUM_COUNTER_FIELD,
+        SEQ_NUM_COUNTER_TABLE, connectionFactory);
+    this.delegationKeyIdCounter = new DistributedSQLCounter(KEY_ID_COUNTER_FIELD,
+        KEY_ID_COUNTER_TABLE, connectionFactory);
+    this.retryHandler = retryHandler;
+
+    try {
+      super.startThreads();
+    } catch (IOException e) {
+      throw new RuntimeException("Error starting threads for MySQL secret manager", e);
+    }
+
+    LOG.info("MySQL delegation token secret manager instantiated");
+  }
+
+  @Override
+  public DelegationTokenIdentifier createIdentifier() {
+    return new DelegationTokenIdentifier();
+  }
+
+  @Override
+  public void stopThreads() {
+    super.stopThreads();
+    connectionFactory.shutdown();
+  }
+
+  @Override
+  protected void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
+      throws SQLException {
+    retryHandler.execute(() -> {
+      try (Connection connection = connectionFactory.getConnection(true);
+          PreparedStatement statement = connection.prepareStatement(
+              "INSERT INTO Tokens (sequenceNum, tokenIdentifier, tokenInfo) VALUES (?, ?, ?)")) {
+        statement.setInt(1, sequenceNum);
+        statement.setBytes(2, tokenIdentifier);
+        statement.setBytes(3, tokenInfo);
+        statement.execute();
+      }
+    });
+  }
+
+  @Override
+  protected void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
+      throws SQLException {
+    retryHandler.execute(() -> {
+      try (Connection connection = connectionFactory.getConnection(true);
+          PreparedStatement statement = connection.prepareStatement(
+              "UPDATE Tokens SET tokenInfo = ? WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
+        statement.setBytes(1, tokenInfo);
+        statement.setInt(2, sequenceNum);
+        statement.setBytes(3, tokenIdentifier);
+        statement.execute();
+      }
+    });
+  }
+
+  @Override
+  protected void deleteToken(int sequenceNum, byte[] tokenIdentifier) throws SQLException {
+    retryHandler.execute(() -> {
+      try (Connection connection = connectionFactory.getConnection(true);
+          PreparedStatement statement = connection.prepareStatement(
+              "DELETE FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
+        statement.setInt(1, sequenceNum);
+        statement.setBytes(2, tokenIdentifier);
+        statement.execute();
+      }
+    });
+  }
+
+  @Override
+  protected byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws SQLException {
+    return retryHandler.execute(() -> {
+      try (Connection connection = connectionFactory.getConnection();
+          PreparedStatement statement = connection.prepareStatement(
+              "SELECT tokenInfo FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
+        statement.setInt(1, sequenceNum);
+        statement.setBytes(2, tokenIdentifier);
+        try (ResultSet result = statement.executeQuery()) {
+          if (result.next()) {
+            return result.getBytes("tokenInfo");
+          }
+        }
+      }
+      return null;
+    });
+  }
+
+  @Override
+  protected void insertDelegationKey(int keyId, byte[] delegationKey) throws SQLException {
+    retryHandler.execute(() -> {
+      try (Connection connection = connectionFactory.getConnection(true);
+          PreparedStatement statement = connection.prepareStatement(
+              "INSERT INTO DelegationKeys (keyId, delegationKey) VALUES (?, ?)")) {
+        statement.setInt(1, keyId);
+        statement.setBytes(2, delegationKey);
+        statement.execute();
+      }
+    });
+  }
+
+  @Override
+  protected void updateDelegationKey(int keyId, byte[] delegationKey) throws SQLException {
+    retryHandler.execute(() -> {
+      try (Connection connection = connectionFactory.getConnection(true);
+          PreparedStatement statement = connection.prepareStatement(
+              "UPDATE DelegationKeys SET delegationKey = ? WHERE keyId = ?")) {
+        statement.setBytes(1, delegationKey);
+        statement.setInt(2, keyId);
+        statement.execute();
+      }
+    });
+  }
+
+  @Override
+  protected void deleteDelegationKey(int keyId) throws SQLException {
+    retryHandler.execute(() -> {
+      try (Connection connection = connectionFactory.getConnection(true);
+          PreparedStatement statement = connection.prepareStatement(
+              "DELETE FROM DelegationKeys WHERE keyId = ?")) {
+        statement.setInt(1, keyId);
+        statement.execute();
+      }
+    });
+  }
+
+  @Override
+  protected byte[] selectDelegationKey(int keyId) throws SQLException {
+    return retryHandler.execute(() -> {
+      try (Connection connection = connectionFactory.getConnection();
+          PreparedStatement statement = connection.prepareStatement(
+              "SELECT delegationKey FROM DelegationKeys WHERE keyId = ?")) {
+        statement.setInt(1, keyId);
+        try (ResultSet result = statement.executeQuery()) {
+          if (result.next()) {
+            return result.getBytes("delegationKey");
+          }
+        }
+      }
+      return null;
+    });
+  }
+
+  @Override
+  protected int selectSequenceNum() throws SQLException {
+    return retryHandler.execute(() -> sequenceNumCounter.selectCounterValue());
+  }
+
+  @Override
+  protected void updateSequenceNum(int value) throws SQLException {
+    retryHandler.execute(() -> sequenceNumCounter.updateCounterValue(value));
+  }
+
+  @Override
+  protected int incrementSequenceNum(int amount) throws SQLException {
+    return retryHandler.execute(() -> sequenceNumCounter.incrementCounterValue(amount));
+  }
+
+  @Override
+  protected int selectKeyId() throws SQLException {
+    return retryHandler.execute(delegationKeyIdCounter::selectCounterValue);
+  }
+
+  @Override
+  protected void updateKeyId(int value) throws SQLException {
+    retryHandler.execute(() -> delegationKeyIdCounter.updateCounterValue(value));
+  }
+
+  @Override
+  protected int incrementKeyId(int amount) throws SQLException {
+    return retryHandler.execute(() -> delegationKeyIdCounter.incrementCounterValue(amount));
+  }
+
+  @VisibleForTesting
+  protected SQLConnectionFactory getConnectionFactory() {
+    return connectionFactory;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java
new file mode 100644
index 00000000000..16151226217
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface to handle retries when {@link SQLDelegationTokenSecretManagerImpl}
+ * throws expected errors.
+ */
+public interface SQLSecretManagerRetriableHandler {
+  void execute(SQLCommandVoid command) throws SQLException;
+  <T> T execute(SQLCommand<T> command) throws SQLException;
+
+  @FunctionalInterface
+  interface SQLCommandVoid {
+    void doCall() throws SQLException;
+  }
+
+  @FunctionalInterface
+  interface SQLCommand<T> {
+    T doCall() throws SQLException;
+  }
+}
+
+/**
+ * Implementation of {@link SQLSecretManagerRetriableHandler} that uses a
+ * {@link RetryProxy} to simplify the retryable operations.
+ */
+class SQLSecretManagerRetriableHandlerImpl implements SQLSecretManagerRetriableHandler {
+  public final static String MAX_RETRIES =
+      SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "max-retries";
+  public final static int MAX_RETRIES_DEFAULT = 0;
+  public final static String RETRY_SLEEP_TIME_MS =
+      SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "retry-sleep-time-ms";
+  public final static long RETRY_SLEEP_TIME_MS_DEFAULT = 100;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SQLSecretManagerRetriableHandlerImpl.class);
+
+  static SQLSecretManagerRetriableHandler getInstance(Configuration conf) {
+    return getInstance(conf, new SQLSecretManagerRetriableHandlerImpl());
+  }
+
+  static SQLSecretManagerRetriableHandler getInstance(Configuration conf,
+      SQLSecretManagerRetriableHandlerImpl retryHandler) {
+    RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry(
+        conf.getInt(MAX_RETRIES, MAX_RETRIES_DEFAULT),
+        conf.getLong(RETRY_SLEEP_TIME_MS, RETRY_SLEEP_TIME_MS_DEFAULT),
+        TimeUnit.MILLISECONDS);
+
+    // Configure SQLSecretManagerRetriableException to retry with exponential backoff
+    Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<>();
+    exceptionToPolicyMap.put(SQLSecretManagerRetriableException.class, basePolicy);
+
+    // Configure all other exceptions to fail after one attempt
+    RetryPolicy retryPolicy = RetryPolicies.retryByException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+
+    return (SQLSecretManagerRetriableHandler) RetryProxy.create(
+        SQLSecretManagerRetriableHandler.class, retryHandler, retryPolicy);
+  }
+
+  /**
+   * Executes a SQL command and raises retryable errors as
+   * {@link SQLSecretManagerRetriableException}s so they are recognized by the
+   * {@link RetryProxy}.
+   * @param command SQL command to execute
+   * @throws SQLException When SQL connection errors occur
+   */
+  @Override
+  public void execute(SQLCommandVoid command) throws SQLException {
+    try {
+      command.doCall();
+    } catch (SQLException e) {
+      LOG.warn("Failed to execute SQL command", e);
+      throw new SQLSecretManagerRetriableException(e);
+    }
+  }
+
+  /**
+   * Executes a SQL command and raises retryable errors as
+   * {@link SQLSecretManagerRetriableException}s so they are recognized by the
+   * {@link RetryProxy}.
+   * @param command SQL command to execute
+   * @throws SQLException When SQL connection errors occur
+   */
+  @Override
+  public <T> T execute(SQLCommand<T> command) throws SQLException {
+    try {
+      return command.doCall();
+    } catch (SQLException e) {
+      LOG.warn("Failed to execute SQL command", e);
+      throw new SQLSecretManagerRetriableException(e);
+    }
+  }
+
+  /**
+   * Class used to identify errors that can be retried.
+   */
+  static class SQLSecretManagerRetriableException extends SQLException {
+    SQLSecretManagerRetriableException(Throwable cause) {
+      super(cause);
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
new file mode 100644
index 00000000000..569a274042b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestSQLDelegationTokenSecretManagerImpl {
+  private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore";
+  private static final int TEST_MAX_RETRIES = 3;
+  private static Configuration conf;
+
+  @Before
+  public void init() throws SQLException {
+    createTestDBTables();
+  }
+
+  @After
+  public void cleanup() throws SQLException {
+    dropTestDBTables();
+  }
+
+  @BeforeClass
+  public static void initDatabase() throws SQLException {
+    DriverManager.getConnection(CONNECTION_URL + ";create=true");
+
+    conf = new Configuration();
+    conf.set(SQLConnectionFactory.CONNECTION_URL, CONNECTION_URL);
+    conf.set(SQLConnectionFactory.CONNECTION_USERNAME, "testuser");
+    conf.set(SQLConnectionFactory.CONNECTION_PASSWORD, "testpassword");
+    conf.set(SQLConnectionFactory.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
+    conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, TEST_MAX_RETRIES);
+    conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10);
+  }
+
+  @AfterClass
+  public static void cleanupDatabase() {
+    try {
+      DriverManager.getConnection(CONNECTION_URL + ";drop=true");
+    } catch (SQLException e) {
+      // SQLException expected when database is dropped
+      if (!e.getMessage().contains("dropped")) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Test
+  public void testSingleSecretManager() throws Exception {
+    DelegationTokenManager tokenManager = createTokenManager();
+    try {
+      Token<? extends AbstractDelegationTokenIdentifier> token =
+          tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      validateToken(tokenManager, token);
+    } finally {
+      stopTokenManager(tokenManager);
+    }
+  }
+
+  @Test
+  public void testMultipleSecretManagers() throws Exception {
+    DelegationTokenManager tokenManager1 = createTokenManager();
+    DelegationTokenManager tokenManager2 = createTokenManager();
+
+    try {
+      Token<? extends AbstractDelegationTokenIdentifier> token1 =
+          tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      Token<? extends AbstractDelegationTokenIdentifier> token2 =
+          tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
+
+      validateToken(tokenManager1, token2);
+      validateToken(tokenManager2, token1);
+    } finally {
+      stopTokenManager(tokenManager1);
+      stopTokenManager(tokenManager2);
+    }
+  }
+
+  @Test
+  public void testSequenceNumAllocation() throws Exception {
+    int tokensPerManager = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE * 5;
+    Set<Integer> sequenceNums1 = new HashSet<>();
+    Set<Integer> sequenceNums2 = new HashSet<>();
+    Set<Integer> sequenceNums3 = new HashSet<>();
+    Set<Integer> sequenceNums = new HashSet<>();
+    DelegationTokenManager tokenManager1 = createTokenManager();
+    DelegationTokenManager tokenManager2 = createTokenManager();
+    DelegationTokenManager tokenManager3 = createTokenManager();
+
+    try {
+      for (int i = 0; i < tokensPerManager; i++) {
+        allocateSequenceNum(tokenManager1, sequenceNums1);
+        allocateSequenceNum(tokenManager2, sequenceNums2);
+        allocateSequenceNum(tokenManager3, sequenceNums3);
+        sequenceNums.addAll(sequenceNums1);
+        sequenceNums.addAll(sequenceNums2);
+        sequenceNums.addAll(sequenceNums3);
+      }
+
+      Assert.assertEquals("Verify that all tokens were created with unique sequence numbers",
+          tokensPerManager * 3, sequenceNums.size());
+      Assert.assertEquals("Verify that tokenManager1 generated unique sequence numbers",
+          tokensPerManager, sequenceNums1.size());
+      Assert.assertEquals("Verify that tokenManager2 generated unique sequence number",
+          tokensPerManager, sequenceNums2.size());
+      Assert.assertEquals("Verify that tokenManager3 generated unique sequence numbers",
+          tokensPerManager, sequenceNums3.size());
+
+      // Validate sequence number batches allocated in order to each token manager
+      int batchSize = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE;
+      for (int seqNum = 1; seqNum < tokensPerManager;) {
+        // First batch allocated tokenManager1
+        for (int i = 0; i < batchSize; i++, seqNum++) {
+          Assert.assertTrue(sequenceNums1.contains(seqNum));
+        }
+        // Second batch allocated tokenManager2
+        for (int i = 0; i < batchSize; i++, seqNum++) {
+          Assert.assertTrue(sequenceNums2.contains(seqNum));
+        }
+        // Third batch allocated tokenManager3
+        for (int i = 0; i < batchSize; i++, seqNum++) {
+          Assert.assertTrue(sequenceNums3.contains(seqNum));
+        }
+      }
+
+      SQLDelegationTokenSecretManagerImpl secretManager =
+          (SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager();
+      Assert.assertEquals("Verify that the counter is set to the highest sequence number",
+          tokensPerManager * 3, secretManager.getDelegationTokenSeqNum());
+    } finally {
+      stopTokenManager(tokenManager1);
+      stopTokenManager(tokenManager2);
+      stopTokenManager(tokenManager3);
+    }
+  }
+
+  @Test
+  public void testSequenceNumRollover() throws Exception {
+    int tokenBatch = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE;
+    Set<Integer> sequenceNums = new HashSet<>();
+
+    DelegationTokenManager tokenManager = createTokenManager();
+
+    try {
+      SQLDelegationTokenSecretManagerImpl secretManager =
+          (SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager();
+      secretManager.setDelegationTokenSeqNum(Integer.MAX_VALUE - tokenBatch);
+
+      // Allocate sequence numbers before they are rolled over
+      for (int seqNum = Integer.MAX_VALUE - tokenBatch; seqNum < Integer.MAX_VALUE; seqNum++) {
+        allocateSequenceNum(tokenManager, sequenceNums);
+        Assert.assertTrue(sequenceNums.contains(seqNum + 1));
+      }
+
+      // Allocate sequence numbers after they are rolled over
+      for (int seqNum = 0; seqNum < tokenBatch; seqNum++) {
+        allocateSequenceNum(tokenManager, sequenceNums);
+        Assert.assertTrue(sequenceNums.contains(seqNum + 1));
+      }
+    } finally {
+      stopTokenManager(tokenManager);
+    }
+  }
+
+  @Test
+  public void testDelegationKeyAllocation() throws Exception {
+    DelegationTokenManager tokenManager1 = createTokenManager();
+
+    try {
+      SQLDelegationTokenSecretManagerImpl secretManager1 =
+          (SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager();
+      // Prevent delegation keys to roll for the rest of the test to avoid race conditions
+      // between the keys generated and the active keys in the database.
+      ((TestDelegationTokenSecretManager) secretManager1).lockKeyRoll();
+      int keyId1 = secretManager1.getCurrentKeyId();
+
+      // Validate that latest key1 is assigned to tokenManager1 tokens
+      Token<? extends AbstractDelegationTokenIdentifier> token1 =
+          tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      validateKeyId(token1, keyId1);
+
+      DelegationTokenManager tokenManager2 = createTokenManager();
+      try {
+        SQLDelegationTokenSecretManagerImpl secretManager2 =
+            (SQLDelegationTokenSecretManagerImpl) tokenManager2.getDelegationTokenSecretManager();
+        // Prevent delegation keys to roll for the rest of the test
+        ((TestDelegationTokenSecretManager) secretManager2).lockKeyRoll();
+        int keyId2 = secretManager2.getCurrentKeyId();
+
+        Assert.assertNotEquals("Each secret manager has its own key", keyId1, keyId2);
+
+        // Validate that latest key2 is assigned to tokenManager2 tokens
+        Token<? extends AbstractDelegationTokenIdentifier> token2 =
+            tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
+        validateKeyId(token2, keyId2);
+
+        // Validate that key1 is still assigned to tokenManager1 tokens
+        token1 = tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+        validateKeyId(token1, keyId1);
+
+        // Validate that key2 is still assigned to tokenManager2 tokens
+        token2 = tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
+        validateKeyId(token2, keyId2);
+      } finally {
+        stopTokenManager(tokenManager2);
+      }
+    } finally {
+      stopTokenManager(tokenManager1);
+    }
+  }
+
+  @Test
+  public void testHikariConfigs() {
+    HikariDataSourceConnectionFactory factory1 = new HikariDataSourceConnectionFactory(conf);
+    int defaultMaximumPoolSize = factory1.getDataSource().getMaximumPoolSize();
+    factory1.shutdown();
+
+    // Changing default maximumPoolSize
+    Configuration hikariConf = new Configuration(conf);
+    hikariConf.setInt(HikariDataSourceConnectionFactory.HIKARI_PROPS + "maximumPoolSize",
+        defaultMaximumPoolSize + 1);
+
+    // Verifying property is correctly set in datasource
+    HikariDataSourceConnectionFactory factory2 = new HikariDataSourceConnectionFactory(hikariConf);
+    Assert.assertEquals(factory2.getDataSource().getMaximumPoolSize(),
+        defaultMaximumPoolSize + 1);
+    factory2.shutdown();
+  }
+
+  @Test
+  public void testRetries() throws Exception {
+    DelegationTokenManager tokenManager = createTokenManager();
+    TestDelegationTokenSecretManager secretManager =
+        (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager();
+
+    try {
+      // Prevent delegation keys to roll for the rest of the test
+      secretManager.lockKeyRoll();
+
+      // Reset counter and expect a single request when inserting a token
+      TestRetryHandler.resetExecutionAttemptCounter();
+      tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertEquals(1, TestRetryHandler.getExecutionAttempts());
+
+      // Breaking database connections to cause retries
+      secretManager.setReadOnly(true);
+
+      // Reset counter and expect a multiple retries when failing to insert a token
+      TestRetryHandler.resetExecutionAttemptCounter();
+      tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertEquals(TEST_MAX_RETRIES + 1, TestRetryHandler.getExecutionAttempts());
+    } finally {
+      // Fix database connections
+      secretManager.setReadOnly(false);
+      stopTokenManager(tokenManager);
+    }
+  }
+
+  private DelegationTokenManager createTokenManager() {
+    DelegationTokenManager tokenManager = new DelegationTokenManager(new Configuration(), null);
+    tokenManager.setExternalDelegationTokenSecretManager(new TestDelegationTokenSecretManager());
+    return tokenManager;
+  }
+
+  private void allocateSequenceNum(DelegationTokenManager tokenManager, Set<Integer> sequenceNums)
+      throws IOException {
+    Token<? extends AbstractDelegationTokenIdentifier> token =
+        tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
+    AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
+    Assert.assertFalse("Verify sequence number is unique",
+        sequenceNums.contains(tokenIdentifier.getSequenceNumber()));
+
+    sequenceNums.add(tokenIdentifier.getSequenceNumber());
+  }
+
+  private void validateToken(DelegationTokenManager tokenManager,
+      Token<? extends AbstractDelegationTokenIdentifier> token)
+      throws Exception {
+    SQLDelegationTokenSecretManagerImpl secretManager =
+        (SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager();
+    AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
+
+    // Verify token using token manager
+    tokenManager.verifyToken(token);
+
+    byte[] tokenInfo1 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
+        tokenIdentifier.getBytes());
+    Assert.assertNotNull("Verify token exists in database", tokenInfo1);
+
+    // Renew token using token manager
+    tokenManager.renewToken(token, "foo");
+
+    byte[] tokenInfo2 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
+        tokenIdentifier.getBytes());
+    Assert.assertNotNull("Verify token exists in database", tokenInfo2);
+    Assert.assertFalse("Verify token has been updated in database",
+        Arrays.equals(tokenInfo1, tokenInfo2));
+
+    // Cancel token using token manager
+    tokenManager.cancelToken(token, "foo");
+    byte[] tokenInfo3 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
+        tokenIdentifier.getBytes());
+    Assert.assertNull("Verify token was removed from database", tokenInfo3);
+  }
+
+  private void validateKeyId(Token<? extends AbstractDelegationTokenIdentifier> token,
+      int expectedKeyiD) throws IOException {
+    AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
+    Assert.assertEquals("Verify that keyId is assigned to token",
+        tokenIdentifier.getMasterKeyId(), expectedKeyiD);
+  }
+
+  private static Connection getTestDBConnection() {
+    try {
+      return DriverManager.getConnection(CONNECTION_URL);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void createTestDBTables() throws SQLException {
+    execute("CREATE TABLE Tokens (sequenceNum INT NOT NULL, "
+        + "tokenIdentifier VARCHAR (255) FOR BIT DATA NOT NULL, "
+        + "tokenInfo VARCHAR (255) FOR BIT DATA NOT NULL, "
+        + "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
+        + "PRIMARY KEY(sequenceNum))");
+    execute("CREATE TABLE DelegationKeys (keyId INT NOT NULL, "
+        + "delegationKey VARCHAR (255) FOR BIT DATA NOT NULL, "
+        + "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
+        + "PRIMARY KEY(keyId))");
+    execute("CREATE TABLE LastSequenceNum (sequenceNum INT NOT NULL)");
+    execute("INSERT INTO LastSequenceNum VALUES (0)");
+    execute("CREATE TABLE LastDelegationKeyId (keyId INT NOT NULL)");
+    execute("INSERT INTO LastDelegationKeyId VALUES (0)");
+  }
+
+  private static void dropTestDBTables() throws SQLException {
+    execute("DROP TABLE Tokens");
+    execute("DROP TABLE DelegationKeys");
+    execute("DROP TABLE LastSequenceNum");
+    execute("DROP TABLE LastDelegationKeyId");
+  }
+
+  private static void execute(String statement) throws SQLException {
+    try (Connection connection = getTestDBConnection()) {
+      connection.createStatement().execute(statement);
+    }
+  }
+
+  private void stopTokenManager(DelegationTokenManager tokenManager) {
+    TestDelegationTokenSecretManager secretManager =
+        (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager();
+    // Release any locks on tables
+    secretManager.unlockKeyRoll();
+    // Stop threads to close database connections
+    secretManager.stopThreads();
+  }
+
+  static class TestDelegationTokenSecretManager extends SQLDelegationTokenSecretManagerImpl {
+    private ReentrantLock keyRollLock;
+
+    private synchronized ReentrantLock getKeyRollLock() {
+      if (keyRollLock == null) {
+        keyRollLock = new ReentrantLock();
+      }
+      return keyRollLock;
+    }
+
+    TestDelegationTokenSecretManager() {
+      super(conf, new TestConnectionFactory(conf),
+          SQLSecretManagerRetriableHandlerImpl.getInstance(conf, new TestRetryHandler()));
+    }
+
+    // Tests can call this method to prevent delegation keys from
+    // being rolled in the middle of a test to prevent race conditions
+    public void lockKeyRoll() {
+      getKeyRollLock().lock();
+    }
+
+    public void unlockKeyRoll() {
+      if (getKeyRollLock().isHeldByCurrentThread()) {
+        getKeyRollLock().unlock();
+      }
+    }
+
+    @Override
+    protected void rollMasterKey() throws IOException {
+      try {
+        lockKeyRoll();
+        super.rollMasterKey();
+      } finally {
+        unlockKeyRoll();
+      }
+    }
+
+    public void setReadOnly(boolean readOnly) {
+      ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly;
+    }
+  }
+
+  static class TestConnectionFactory extends HikariDataSourceConnectionFactory {
+    private boolean readOnly = false;
+    TestConnectionFactory(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    public Connection getConnection() throws SQLException {
+      Connection connection = super.getConnection();
+      // Change to default schema as derby driver looks for user schema
+      connection.setSchema("APP");
+      connection.setReadOnly(readOnly);
+      return connection;
+    }
+  }
+
+  static class TestRetryHandler extends SQLSecretManagerRetriableHandlerImpl {
+    // Tracks the amount of times that a SQL command is executed, regardless of
+    // whether it completed successfully or not.
+    private static AtomicInteger executionAttemptCounter = new AtomicInteger();
+
+    static void resetExecutionAttemptCounter() {
+      executionAttemptCounter = new AtomicInteger();
+    }
+
+    static int getExecutionAttempts() {
+      return executionAttemptCounter.get();
+    }
+
+    @Override
+    public void execute(SQLCommandVoid command) throws SQLException {
+      executionAttemptCounter.incrementAndGet();
+      super.execute(command);
+    }
+  }
+}
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 7f9e7a78e52..d26e92d6b2d 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -133,6 +133,8 @@
     <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>
+    <derby.version>10.10.2.0</derby.version>
+    <mysql-connector-java.version>8.0.29</mysql-connector-java.version>
     <mssql.version>6.2.1.jre7</mssql.version>
     <okhttp3.version>4.9.3</okhttp3.version>
     <kotlin-stdlib.verion>1.4.10</kotlin-stdlib.verion>
@@ -1869,6 +1871,16 @@
           <artifactId>HikariCP-java7</artifactId>
           <version>${hikari.version}</version>
         </dependency>
+        <dependency>
+          <groupId>org.apache.derby</groupId>
+          <artifactId>derby</artifactId>
+          <version>${derby.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>mysql</groupId>
+          <artifactId>mysql-connector-java</artifactId>
+          <version>${mysql-connector-java.version}</version>
+        </dependency>
         <dependency>
           <groupId>com.microsoft.sqlserver</groupId>
           <artifactId>mssql-jdbc</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org