You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2023/02/22 18:40:09 UTC
[hadoop] branch trunk updated: HADOOP-18535. Implement token storage solution based on MySQL
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new fb31393b65b HADOOP-18535. Implement token storage solution based on MySQL
fb31393b65b is described below
commit fb31393b65b7a877ec256678e81a0f852b3cf14d
Author: hchaverr <hc...@linkedin.com>
AuthorDate: Tue Jan 24 10:43:36 2023 -0800
HADOOP-18535. Implement token storage solution based on MySQL
Fixes #1240
Signed-off-by: Owen O'Malley <oo...@linkedin.com>
---
.../AbstractDelegationTokenSecretManager.java | 36 +-
.../SQLDelegationTokenSecretManager.java | 400 +++++++++++++++++
hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml | 22 +
.../TokenStore/MySQL/TokenStoreDatabase.sql | 21 +
.../scripts/TokenStore/MySQL/TokenStoreTables.sql | 52 +++
.../scripts/TokenStore/MySQL/TokenStoreUser.sql | 26 ++
.../hadoop-hdfs-rbf/scripts/TokenStore/README | 24 ++
.../security/token/DistributedSQLCounter.java | 138 ++++++
.../token/HikariDataSourceConnectionFactory.java | 68 +++
.../security/token/SQLConnectionFactory.java | 54 +++
.../token/SQLDelegationTokenSecretManagerImpl.java | 242 +++++++++++
.../token/SQLSecretManagerRetriableHandler.java | 133 ++++++
.../TestSQLDelegationTokenSecretManagerImpl.java | 471 +++++++++++++++++++++
hadoop-project/pom.xml | 12 +
14 files changed, 1697 insertions(+), 2 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index 8aaf9bbd8de..cde4cf48413 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -19,7 +19,9 @@
package org.apache.hadoop.security.token.delegation;
import java.io.ByteArrayInputStream;
+import java.io.DataInput;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.ArrayList;
@@ -41,6 +43,8 @@ import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -441,8 +445,9 @@ extends AbstractDelegationTokenIdentifier>
/**
* Update the current master key for generating delegation tokens
* It should be called only by tokenRemoverThread.
+ * @throws IOException raised on errors performing I/O.
*/
- void rollMasterKey() throws IOException {
+ protected void rollMasterKey() throws IOException {
synchronized (this) {
removeExpiredKeys();
/* set final expiry date for retiring currentKey */
@@ -677,11 +682,15 @@ extends AbstractDelegationTokenIdentifier>
/** Class to encapsulate a token's renew date and password. */
@InterfaceStability.Evolving
- public static class DelegationTokenInformation {
+ public static class DelegationTokenInformation implements Writable {
long renewDate;
byte[] password;
String trackingId;
+ public DelegationTokenInformation() {
+ this(0, null);
+ }
+
public DelegationTokenInformation(long renewDate, byte[] password) {
this(renewDate, password, null);
}
@@ -711,6 +720,29 @@ extends AbstractDelegationTokenIdentifier>
public String getTrackingId() {
return trackingId;
}
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeVLong(out, renewDate);
+ if (password == null) {
+ WritableUtils.writeVInt(out, -1);
+ } else {
+ WritableUtils.writeVInt(out, password.length);
+ out.write(password);
+ }
+ WritableUtils.writeString(out, trackingId);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ renewDate = WritableUtils.readVLong(in);
+ int len = WritableUtils.readVInt(in);
+ if (len > -1) {
+ password = new byte[len];
+ in.readFully(password);
+ }
+ trackingId = WritableUtils.readString(in);
+ }
}
/** Remove expired delegation tokens from cache */
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
new file mode 100644
index 00000000000..4b6ae21d7a9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.SQLException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of {@link AbstractDelegationTokenSecretManager} that
+ * persists TokenIdentifiers and DelegationKeys in an existing SQL database.
+ */
+public abstract class SQLDelegationTokenSecretManager<TokenIdent
+ extends AbstractDelegationTokenIdentifier>
+ extends AbstractDelegationTokenSecretManager<TokenIdent> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SQLDelegationTokenSecretManager.class);
+
+ public static final String SQL_DTSM_CONF_PREFIX = "sql-dt-secret-manager.";
+ private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX
+ + "token.seqnum.batch.size";
+ public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10;
+
+ // Batch of sequence numbers that will be requested by the sequenceNumCounter.
+ // A new batch is requested once the sequenceNums available to a secret manager are
+ // exhausted, including during initialization.
+ private final int seqNumBatchSize;
+
+ // Last sequenceNum in the current batch that has been allocated to a token.
+ private int currentSeqNum;
+
+ // Max sequenceNum in the current batch that can be allocated to a token.
+ // Unused sequenceNums in the current batch cannot be reused by other routers.
+ private int currentMaxSeqNum;
+
+ public SQLDelegationTokenSecretManager(Configuration conf) {
+ super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
+ DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
+ conf.getLong(DelegationTokenManager.MAX_LIFETIME,
+ DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000,
+ conf.getLong(DelegationTokenManager.RENEW_INTERVAL,
+ DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000,
+ conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
+ DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
+
+ this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
+ DEFAULT_SEQ_NUM_BATCH_SIZE);
+ }
+
+ /**
+ * Persists a TokenIdentifier and its corresponding TokenInformation into
+ * the SQL database. The TokenIdentifier is expected to be unique and any
+ * duplicate token attempts will result in an IOException.
+ * @param ident TokenIdentifier to persist.
+ * @param tokenInfo DelegationTokenInformation associated with the TokenIdentifier.
+ */
+ @Override
+ protected void storeToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos)) {
+ tokenInfo.write(dos);
+ // Add token to SQL database
+ insertToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray());
+ // Add token to local cache
+ super.storeToken(ident, tokenInfo);
+ } catch (SQLException e) {
+ throw new IOException("Failed to store token in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Updates the TokenInformation of an existing TokenIdentifier in
+ * the SQL database.
+ * @param ident Existing TokenIdentifier in the SQL database.
+ * @param tokenInfo Updated DelegationTokenInformation associated with the TokenIdentifier.
+ */
+ @Override
+ protected void updateToken(TokenIdent ident,
+ DelegationTokenInformation tokenInfo) throws IOException {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ try (DataOutputStream dos = new DataOutputStream(bos)) {
+ tokenInfo.write(dos);
+ // Update token in SQL database
+ updateToken(ident.getSequenceNumber(), ident.getBytes(), bos.toByteArray());
+ // Update token in local cache
+ super.updateToken(ident, tokenInfo);
+ }
+ } catch (SQLException e) {
+ throw new IOException("Failed to update token in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Cancels a token by removing it from the SQL database. This will
+ * call the corresponding method in {@link AbstractDelegationTokenSecretManager}
+ * to perform validation and remove the token from the cache.
+ * @return Identifier of the canceled token
+ */
+ @Override
+ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
+ String canceller) throws IOException {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(token.getIdentifier());
+ DataInputStream din = new DataInputStream(bis)) {
+ TokenIdent id = createIdentifier();
+ id.readFields(din);
+
+ // Calling getTokenInfo to load token into local cache if not present.
+ // super.cancelToken() requires token to be present in local cache.
+ getTokenInfo(id);
+ }
+
+ return super.cancelToken(token, canceller);
+ }
+
+ /**
+ * Removes the existing TokenInformation from the SQL database to
+ * invalidate it.
+ * @param ident TokenInformation to remove from the SQL database.
+ */
+ @Override
+ protected void removeStoredToken(TokenIdent ident) throws IOException {
+ try {
+ deleteToken(ident.getSequenceNumber(), ident.getBytes());
+ } catch (SQLException e) {
+ LOG.warn("Failed to remove token in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Obtains the DelegationTokenInformation associated with the given
+ * TokenIdentifier in the SQL database.
+ * @param ident Existing TokenIdentifier in the SQL database.
+ * @return DelegationTokenInformation that matches the given TokenIdentifier or
+ * null if it doesn't exist in the database.
+ */
+ @Override
+ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
+ // Look for token in local cache
+ DelegationTokenInformation tokenInfo = super.getTokenInfo(ident);
+
+ if (tokenInfo == null) {
+ try {
+ // Look for token in SQL database
+ byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes());
+
+ if (tokenInfoBytes != null) {
+ tokenInfo = new DelegationTokenInformation();
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) {
+ try (DataInputStream dis = new DataInputStream(bis)) {
+ tokenInfo.readFields(dis);
+ }
+ }
+
+ // Update token in local cache
+ currentTokens.put(ident, tokenInfo);
+ }
+ } catch (IOException | SQLException e) {
+ LOG.error("Failed to get token in SQL secret manager", e);
+ }
+ }
+
+ return tokenInfo;
+ }
+
+ /**
+ * Obtains the value of the last reserved sequence number.
+ * @return Last reserved sequence number.
+ */
+ @Override
+ public int getDelegationTokenSeqNum() {
+ try {
+ return selectSequenceNum();
+ } catch (SQLException e) {
+ throw new RuntimeException(
+ "Failed to get token sequence number in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Updates the value of the last reserved sequence number.
+ * @param seqNum Value to update the sequence number to.
+ */
+ @Override
+ public void setDelegationTokenSeqNum(int seqNum) {
+ try {
+ updateSequenceNum(seqNum);
+ } catch (SQLException e) {
+ throw new RuntimeException(
+ "Failed to update token sequence number in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Obtains the next available sequence number that can be allocated to a Token.
+ * Sequence numbers need to be reserved using the shared sequenceNumberCounter once
+ * the local batch has been exhausted, which handles sequenceNumber allocation
+ * concurrently with other secret managers.
+ * This method ensures that sequence numbers are incremental in a single secret manager,
+ * but not across secret managers.
+ * @return Next available sequence number.
+ */
+ @Override
+ public synchronized int incrementDelegationTokenSeqNum() {
+ if (currentSeqNum >= currentMaxSeqNum) {
+ try {
+ // Request a new batch of sequence numbers and use the
+ // lowest one available.
+ currentSeqNum = incrementSequenceNum(seqNumBatchSize);
+ currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
+ } catch (SQLException e) {
+ throw new RuntimeException(
+ "Failed to increment token sequence number in SQL secret manager", e);
+ }
+ }
+
+ return ++currentSeqNum;
+ }
+
+ /**
+ * Persists a DelegationKey into the SQL database. The delegation keyId
+ * is expected to be unique and any duplicate key attempts will result
+ * in an IOException.
+ * @param key DelegationKey to persist into the SQL database.
+ */
+ @Override
+ protected void storeDelegationKey(DelegationKey key) throws IOException {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos)) {
+ key.write(dos);
+ // Add delegation key to SQL database
+ insertDelegationKey(key.getKeyId(), bos.toByteArray());
+ // Add delegation key to local cache
+ super.storeDelegationKey(key);
+ } catch (SQLException e) {
+ throw new IOException("Failed to store delegation key in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Updates an existing DelegationKey in the SQL database.
+ * @param key Updated DelegationKey.
+ */
+ @Override
+ protected void updateDelegationKey(DelegationKey key) throws IOException {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos)) {
+ key.write(dos);
+ // Update delegation key in SQL database
+ updateDelegationKey(key.getKeyId(), bos.toByteArray());
+ // Update delegation key in local cache
+ super.updateDelegationKey(key);
+ } catch (SQLException e) {
+ throw new IOException("Failed to update delegation key in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Removes the existing DelegationKey from the SQL database to
+ * invalidate it.
+ * @param key DelegationKey to remove from the SQL database.
+ */
+ @Override
+ protected void removeStoredMasterKey(DelegationKey key) {
+ try {
+ deleteDelegationKey(key.getKeyId());
+ } catch (SQLException e) {
+ LOG.warn("Failed to remove delegation key in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Obtains the DelegationKey from the SQL database.
+ * @param keyId KeyId of the DelegationKey to obtain.
+ * @return DelegationKey that matches the given keyId or null
+ * if it doesn't exist in the database.
+ */
+ @Override
+ protected DelegationKey getDelegationKey(int keyId) {
+ // Look for delegation key in local cache
+ DelegationKey delegationKey = super.getDelegationKey(keyId);
+
+ if (delegationKey == null) {
+ try {
+ // Look for delegation key in SQL database
+ byte[] delegationKeyBytes = selectDelegationKey(keyId);
+
+ if (delegationKeyBytes != null) {
+ delegationKey = new DelegationKey();
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(delegationKeyBytes)) {
+ try (DataInputStream dis = new DataInputStream(bis)) {
+ delegationKey.readFields(dis);
+ }
+ }
+
+ // Update delegation key in local cache
+ allKeys.put(keyId, delegationKey);
+ }
+ } catch (IOException | SQLException e) {
+ LOG.error("Failed to get delegation key in SQL secret manager", e);
+ }
+ }
+
+ return delegationKey;
+ }
+
+ /**
+ * Obtains the value of the last delegation key id.
+ * @return Last delegation key id.
+ */
+ @Override
+ public int getCurrentKeyId() {
+ try {
+ return selectKeyId();
+ } catch (SQLException e) {
+ throw new RuntimeException(
+ "Failed to get delegation key id in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Updates the value of the last delegation key id.
+ * @param keyId Value to update the delegation key id to.
+ */
+ @Override
+ public void setCurrentKeyId(int keyId) {
+ try {
+ updateKeyId(keyId);
+ } catch (SQLException e) {
+ throw new RuntimeException(
+ "Failed to set delegation key id in SQL secret manager", e);
+ }
+ }
+
+ /**
+ * Obtains the next available delegation key id that can be allocated to a DelegationKey.
+ * Delegation key id need to be reserved using the shared delegationKeyIdCounter,
+ * which handles keyId allocation concurrently with other secret managers.
+ * @return Next available delegation key id.
+ */
+ @Override
+ public int incrementCurrentKeyId() {
+ try {
+ return incrementKeyId(1) + 1;
+ } catch (SQLException e) {
+ throw new RuntimeException(
+ "Failed to increment delegation key id in SQL secret manager", e);
+ }
+ }
+
+ // Token operations in SQL database
+ protected abstract byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier)
+ throws SQLException;
+ protected abstract void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
+ throws SQLException;
+ protected abstract void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
+ throws SQLException;
+ protected abstract void deleteToken(int sequenceNum, byte[] tokenIdentifier)
+ throws SQLException;
+ // Delegation key operations in SQL database
+ protected abstract byte[] selectDelegationKey(int keyId) throws SQLException;
+ protected abstract void insertDelegationKey(int keyId, byte[] delegationKey)
+ throws SQLException;
+ protected abstract void updateDelegationKey(int keyId, byte[] delegationKey)
+ throws SQLException;
+ protected abstract void deleteDelegationKey(int keyId) throws SQLException;
+ // Counter operations in SQL database
+ protected abstract int selectSequenceNum() throws SQLException;
+ protected abstract void updateSequenceNum(int value) throws SQLException;
+ protected abstract int incrementSequenceNum(int amount) throws SQLException;
+ protected abstract int selectKeyId() throws SQLException;
+ protected abstract void updateKeyId(int value) throws SQLException;
+ protected abstract int incrementKeyId(int amount) throws SQLException;
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
index 9fb868f79f3..9e6f12ba8a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
@@ -117,6 +117,15 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.zaxxer</groupId>
+ <artifactId>HikariCP</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -153,6 +162,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>curator-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
@@ -170,6 +184,14 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemProperties>
+ <property>
+ <name>derby.stream.error.file</name>
+ <value>${project.build.directory}/derby.log</value>
+ </property>
+ </systemProperties>
+ </configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql
new file mode 100644
index 00000000000..07fea4c24bc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreDatabase.sql
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Script to create a new Database in MySQL for the TokenStore
+
+CREATE DATABASE IF NOT EXISTS TokenStore;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql
new file mode 100644
index 00000000000..d377c4e15f2
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreTables.sql
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Script to generate all the tables for the TokenStore in MySQL
+
+USE TokenStore
+
+CREATE TABLE IF NOT EXISTS Tokens(
+ sequenceNum int NOT NULL,
+ tokenIdentifier varbinary(255) NOT NULL,
+ tokenInfo varbinary(255) NOT NULL,
+ modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY(sequenceNum, tokenIdentifier)
+);
+
+CREATE TABLE IF NOT EXISTS DelegationKeys(
+ keyId int NOT NULL,
+ delegationKey varbinary(255) NOT NULL,
+ modifiedTime timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
+ PRIMARY KEY(keyId)
+);
+
+CREATE TABLE IF NOT EXISTS LastSequenceNum(
+ sequenceNum int NOT NULL
+);
+
+-- Initialize the LastSequenceNum table with a single entry
+INSERT INTO LastSequenceNum (sequenceNum)
+SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastSequenceNum);
+
+CREATE TABLE IF NOT EXISTS LastDelegationKeyId(
+ keyId int NOT NULL
+);
+
+-- Initialize the LastDelegationKeyId table with a single entry
+INSERT INTO LastDelegationKeyId (keyId)
+SELECT 0 WHERE NOT EXISTS (SELECT * FROM LastDelegationKeyId);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql
new file mode 100644
index 00000000000..844d7a2f944
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/MySQL/TokenStoreUser.sql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Script to create a new User in MySQL for the TokenStore
+
+-- Update TokenStore user and password on this script
+CREATE USER IF NOT EXISTS 'TokenStoreUser'@'%' IDENTIFIED BY 'TokenStorePassword';
+
+GRANT ALL PRIVILEGES ON TokenStore.* TO 'TokenStoreUser'@'%';
+
+FLUSH PRIVILEGES;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README
new file mode 100644
index 00000000000..72425315319
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/scripts/TokenStore/README
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+These scripts must be executed to create the TokenStore database, tables and users needed to use the
+SQLDelegationTokenSecretManagerImpl as the delegation token secret manager:
+1. TokenStoreDatabase.sql
+2. TokenStoreTables.sql
+3. TokenStoreUser.sql
+
+Note: The TokenStoreUser.sql defines a default user/password. You are highly encouraged to set
+this to a proper strong password.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java
new file mode 100644
index 00000000000..14b232783f5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/DistributedSQLCounter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed counter that relies on a SQL database to synchronize
+ * between multiple clients. This expects a table with a single int field
+ * to exist in the database. One record must exist on the table at all times,
+ * representing the last used value reserved by a client.
+ */
+public class DistributedSQLCounter {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DistributedSQLCounter.class);
+
+ private final String field;
+ private final String table;
+ private final SQLConnectionFactory connectionFactory;
+
+ public DistributedSQLCounter(String field, String table,
+ SQLConnectionFactory connectionFactory) {
+ this.field = field;
+ this.table = table;
+ this.connectionFactory = connectionFactory;
+ }
+
+ /**
+ * Obtains the value of the counter.
+ * @return counter value.
+ */
+ public int selectCounterValue() throws SQLException {
+ try (Connection connection = connectionFactory.getConnection()) {
+ return selectCounterValue(false, connection);
+ }
+ }
+
+ private int selectCounterValue(boolean forUpdate, Connection connection) throws SQLException {
+ String query = String.format("SELECT %s FROM %s %s", field, table,
+ forUpdate ? "FOR UPDATE" : "");
+ LOG.debug("Select counter statement: " + query);
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(query)) {
+ if (result.next()) {
+ return result.getInt(field);
+ } else {
+ throw new IllegalStateException("Counter table not initialized: " + table);
+ }
+ }
+ }
+
+ /**
+ * Sets the counter to the given value.
+ * @param value Value to assign to counter.
+ */
+ public void updateCounterValue(int value) throws SQLException {
+ try (Connection connection = connectionFactory.getConnection(true)) {
+ updateCounterValue(value, connection);
+ }
+ }
+
+ /**
+ * Sets the counter to the given value.
+ * @param connection Connection to database hosting the counter table.
+ * @param value Value to assign to counter.
+ */
+ public void updateCounterValue(int value, Connection connection) throws SQLException {
+ String queryText = String.format("UPDATE %s SET %s = ?", table, field);
+ LOG.debug("Update counter statement: " + queryText + ". Value: " + value);
+ try (PreparedStatement statement = connection.prepareStatement(queryText)) {
+ statement.setInt(1, value);
+ statement.execute();
+ }
+ }
+
+ /**
+ * Increments the counter by the given amount and
+ * returns the previous counter value.
+ * @param amount Amount to increase the counter.
+ * @return Previous counter value.
+ */
+ public int incrementCounterValue(int amount) throws SQLException {
+ // Disabling auto-commit to ensure that all statements on this transaction
+ // are committed at once.
+ try (Connection connection = connectionFactory.getConnection(false)) {
+ // Preventing dirty reads and non-repeatable reads to ensure that the
+ // value read will not be updated by a different connection.
+ if (connection.getTransactionIsolation() < Connection.TRANSACTION_REPEATABLE_READ) {
+ connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+ }
+
+ try {
+ // Reading the counter value "FOR UPDATE" to lock the value record,
+ // forcing other connections to wait until this transaction is committed.
+ int lastValue = selectCounterValue(true, connection);
+
+ // Calculate the new counter value and handling overflow by
+ // resetting the counter to 0.
+ int newValue = lastValue + amount;
+ if (newValue < 0) {
+ lastValue = 0;
+ newValue = amount;
+ }
+
+ updateCounterValue(newValue, connection);
+ connection.commit();
+ return lastValue;
+ } catch (Exception e) {
+ // Rollback transaction to release table locks
+ connection.rollback();
+ throw e;
+ }
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java
new file mode 100644
index 00000000000..5510e9f54b9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/HikariDataSourceConnectionFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Class that relies on a HikariDataSource to provide SQL connections.
+ */
+class HikariDataSourceConnectionFactory implements SQLConnectionFactory {
+ protected final static String HIKARI_PROPS = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ + "connection.hikari.";
+ private final HikariDataSource dataSource;
+
+ HikariDataSourceConnectionFactory(Configuration conf) {
+ Properties properties = new Properties();
+ properties.setProperty("jdbcUrl", conf.get(CONNECTION_URL));
+ properties.setProperty("username", conf.get(CONNECTION_USERNAME));
+ properties.setProperty("password", conf.get(CONNECTION_PASSWORD));
+ properties.setProperty("driverClassName", conf.get(CONNECTION_DRIVER));
+
+ // Include hikari connection properties
+ properties.putAll(conf.getPropsWithPrefix(HIKARI_PROPS));
+
+ HikariConfig hikariConfig = new HikariConfig(properties);
+ this.dataSource = new HikariDataSource(hikariConfig);
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ return dataSource.getConnection();
+ }
+
+ @Override
+ public void shutdown() {
+ // Close database connections
+ dataSource.close();
+ }
+
+ @VisibleForTesting
+ HikariDataSource getDataSource() {
+ return dataSource;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java
new file mode 100644
index 00000000000..a464cc81968
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLConnectionFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import com.mysql.cj.jdbc.MysqlDataSource;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
+
+
+/**
+ * Interface to provide SQL connections to the {@link SQLDelegationTokenSecretManagerImpl}.
+ */
+public interface SQLConnectionFactory {
+ String CONNECTION_URL = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ + "connection.url";
+ String CONNECTION_USERNAME = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ + "connection.username";
+ String CONNECTION_PASSWORD = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ + "connection.password";
+ String CONNECTION_DRIVER = SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX
+ + "connection.driver";
+
+ Connection getConnection() throws SQLException;
+ void shutdown();
+
+ default Connection getConnection(boolean autocommit) throws SQLException {
+ Connection connection = getConnection();
+ connection.setAutoCommit(autocommit);
+ return connection;
+ }
+}
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java
new file mode 100644
index 00000000000..7da54778f31
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLDelegationTokenSecretManagerImpl.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * An implementation of {@link SQLDelegationTokenSecretManager} that
+ * persists TokenIdentifiers and DelegationKeys in a SQL database.
+ * This implementation relies on the Datanucleus JDO PersistenceManager, which
+ * can be configured with datanucleus.* configuration properties.
+ */
+public class SQLDelegationTokenSecretManagerImpl
+ extends SQLDelegationTokenSecretManager<AbstractDelegationTokenIdentifier> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SQLDelegationTokenSecretManagerImpl.class);
+ private static final String SEQ_NUM_COUNTER_FIELD = "sequenceNum";
+ private static final String SEQ_NUM_COUNTER_TABLE = "LastSequenceNum";
+ private static final String KEY_ID_COUNTER_FIELD = "keyId";
+ private static final String KEY_ID_COUNTER_TABLE = "LastDelegationKeyId";
+
+ private final SQLConnectionFactory connectionFactory;
+ private final DistributedSQLCounter sequenceNumCounter;
+ private final DistributedSQLCounter delegationKeyIdCounter;
+ private final SQLSecretManagerRetriableHandler retryHandler;
+
+ public SQLDelegationTokenSecretManagerImpl(Configuration conf) {
+ this(conf, new HikariDataSourceConnectionFactory(conf),
+ SQLSecretManagerRetriableHandlerImpl.getInstance(conf));
+ }
+
+ public SQLDelegationTokenSecretManagerImpl(Configuration conf,
+ SQLConnectionFactory connectionFactory, SQLSecretManagerRetriableHandler retryHandler) {
+ super(conf);
+
+ this.connectionFactory = connectionFactory;
+ this.sequenceNumCounter = new DistributedSQLCounter(SEQ_NUM_COUNTER_FIELD,
+ SEQ_NUM_COUNTER_TABLE, connectionFactory);
+ this.delegationKeyIdCounter = new DistributedSQLCounter(KEY_ID_COUNTER_FIELD,
+ KEY_ID_COUNTER_TABLE, connectionFactory);
+ this.retryHandler = retryHandler;
+
+ try {
+ super.startThreads();
+ } catch (IOException e) {
+ throw new RuntimeException("Error starting threads for MySQL secret manager", e);
+ }
+
+ LOG.info("MySQL delegation token secret manager instantiated");
+ }
+
+ @Override
+ public DelegationTokenIdentifier createIdentifier() {
+ return new DelegationTokenIdentifier();
+ }
+
+ @Override
+ public void stopThreads() {
+ super.stopThreads();
+ connectionFactory.shutdown();
+ }
+
+ @Override
+ protected void insertToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
+ throws SQLException {
+ retryHandler.execute(() -> {
+ try (Connection connection = connectionFactory.getConnection(true);
+ PreparedStatement statement = connection.prepareStatement(
+ "INSERT INTO Tokens (sequenceNum, tokenIdentifier, tokenInfo) VALUES (?, ?, ?)")) {
+ statement.setInt(1, sequenceNum);
+ statement.setBytes(2, tokenIdentifier);
+ statement.setBytes(3, tokenInfo);
+ statement.execute();
+ }
+ });
+ }
+
+ @Override
+ protected void updateToken(int sequenceNum, byte[] tokenIdentifier, byte[] tokenInfo)
+ throws SQLException {
+ retryHandler.execute(() -> {
+ try (Connection connection = connectionFactory.getConnection(true);
+ PreparedStatement statement = connection.prepareStatement(
+ "UPDATE Tokens SET tokenInfo = ? WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
+ statement.setBytes(1, tokenInfo);
+ statement.setInt(2, sequenceNum);
+ statement.setBytes(3, tokenIdentifier);
+ statement.execute();
+ }
+ });
+ }
+
+ @Override
+ protected void deleteToken(int sequenceNum, byte[] tokenIdentifier) throws SQLException {
+ retryHandler.execute(() -> {
+ try (Connection connection = connectionFactory.getConnection(true);
+ PreparedStatement statement = connection.prepareStatement(
+ "DELETE FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
+ statement.setInt(1, sequenceNum);
+ statement.setBytes(2, tokenIdentifier);
+ statement.execute();
+ }
+ });
+ }
+
+ @Override
+ protected byte[] selectTokenInfo(int sequenceNum, byte[] tokenIdentifier) throws SQLException {
+ return retryHandler.execute(() -> {
+ try (Connection connection = connectionFactory.getConnection();
+ PreparedStatement statement = connection.prepareStatement(
+ "SELECT tokenInfo FROM Tokens WHERE sequenceNum = ? AND tokenIdentifier = ?")) {
+ statement.setInt(1, sequenceNum);
+ statement.setBytes(2, tokenIdentifier);
+ try (ResultSet result = statement.executeQuery()) {
+ if (result.next()) {
+ return result.getBytes("tokenInfo");
+ }
+ }
+ }
+ return null;
+ });
+ }
+
+ @Override
+ protected void insertDelegationKey(int keyId, byte[] delegationKey) throws SQLException {
+ retryHandler.execute(() -> {
+ try (Connection connection = connectionFactory.getConnection(true);
+ PreparedStatement statement = connection.prepareStatement(
+ "INSERT INTO DelegationKeys (keyId, delegationKey) VALUES (?, ?)")) {
+ statement.setInt(1, keyId);
+ statement.setBytes(2, delegationKey);
+ statement.execute();
+ }
+ });
+ }
+
+ @Override
+ protected void updateDelegationKey(int keyId, byte[] delegationKey) throws SQLException {
+ retryHandler.execute(() -> {
+ try (Connection connection = connectionFactory.getConnection(true);
+ PreparedStatement statement = connection.prepareStatement(
+ "UPDATE DelegationKeys SET delegationKey = ? WHERE keyId = ?")) {
+ statement.setBytes(1, delegationKey);
+ statement.setInt(2, keyId);
+ statement.execute();
+ }
+ });
+ }
+
+ @Override
+ protected void deleteDelegationKey(int keyId) throws SQLException {
+ retryHandler.execute(() -> {
+ try (Connection connection = connectionFactory.getConnection(true);
+ PreparedStatement statement = connection.prepareStatement(
+ "DELETE FROM DelegationKeys WHERE keyId = ?")) {
+ statement.setInt(1, keyId);
+ statement.execute();
+ }
+ });
+ }
+
+ @Override
+ protected byte[] selectDelegationKey(int keyId) throws SQLException {
+ return retryHandler.execute(() -> {
+ try (Connection connection = connectionFactory.getConnection();
+ PreparedStatement statement = connection.prepareStatement(
+ "SELECT delegationKey FROM DelegationKeys WHERE keyId = ?")) {
+ statement.setInt(1, keyId);
+ try (ResultSet result = statement.executeQuery()) {
+ if (result.next()) {
+ return result.getBytes("delegationKey");
+ }
+ }
+ }
+ return null;
+ });
+ }
+
+ @Override
+ protected int selectSequenceNum() throws SQLException {
+ return retryHandler.execute(() -> sequenceNumCounter.selectCounterValue());
+ }
+
+ @Override
+ protected void updateSequenceNum(int value) throws SQLException {
+ retryHandler.execute(() -> sequenceNumCounter.updateCounterValue(value));
+ }
+
+ @Override
+ protected int incrementSequenceNum(int amount) throws SQLException {
+ return retryHandler.execute(() -> sequenceNumCounter.incrementCounterValue(amount));
+ }
+
+ @Override
+ protected int selectKeyId() throws SQLException {
+ return retryHandler.execute(delegationKeyIdCounter::selectCounterValue);
+ }
+
+ @Override
+ protected void updateKeyId(int value) throws SQLException {
+ retryHandler.execute(() -> delegationKeyIdCounter.updateCounterValue(value));
+ }
+
+ @Override
+ protected int incrementKeyId(int amount) throws SQLException {
+ return retryHandler.execute(() -> delegationKeyIdCounter.incrementCounterValue(amount));
+ }
+
+ @VisibleForTesting
+ protected SQLConnectionFactory getConnectionFactory() {
+ return connectionFactory;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java
new file mode 100644
index 00000000000..16151226217
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/SQLSecretManagerRetriableHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Interface to handle retries when {@link SQLDelegationTokenSecretManagerImpl}
+ * throws expected errors.
+ */
+public interface SQLSecretManagerRetriableHandler {
+ void execute(SQLCommandVoid command) throws SQLException;
+ <T> T execute(SQLCommand<T> command) throws SQLException;
+
+ @FunctionalInterface
+ interface SQLCommandVoid {
+ void doCall() throws SQLException;
+ }
+
+ @FunctionalInterface
+ interface SQLCommand<T> {
+ T doCall() throws SQLException;
+ }
+}
+
+/**
+ * Implementation of {@link SQLSecretManagerRetriableHandler} that uses a
+ * {@link RetryProxy} to simplify the retryable operations.
+ */
+class SQLSecretManagerRetriableHandlerImpl implements SQLSecretManagerRetriableHandler {
+ public final static String MAX_RETRIES =
+ SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "max-retries";
+ public final static int MAX_RETRIES_DEFAULT = 0;
+ public final static String RETRY_SLEEP_TIME_MS =
+ SQLDelegationTokenSecretManager.SQL_DTSM_CONF_PREFIX + "retry-sleep-time-ms";
+ public final static long RETRY_SLEEP_TIME_MS_DEFAULT = 100;
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SQLSecretManagerRetriableHandlerImpl.class);
+
+ static SQLSecretManagerRetriableHandler getInstance(Configuration conf) {
+ return getInstance(conf, new SQLSecretManagerRetriableHandlerImpl());
+ }
+
+ static SQLSecretManagerRetriableHandler getInstance(Configuration conf,
+ SQLSecretManagerRetriableHandlerImpl retryHandler) {
+ RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry(
+ conf.getInt(MAX_RETRIES, MAX_RETRIES_DEFAULT),
+ conf.getLong(RETRY_SLEEP_TIME_MS, RETRY_SLEEP_TIME_MS_DEFAULT),
+ TimeUnit.MILLISECONDS);
+
+ // Configure SQLSecretManagerRetriableException to retry with exponential backoff
+ Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap = new HashMap<>();
+ exceptionToPolicyMap.put(SQLSecretManagerRetriableException.class, basePolicy);
+
+ // Configure all other exceptions to fail after one attempt
+ RetryPolicy retryPolicy = RetryPolicies.retryByException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+
+ return (SQLSecretManagerRetriableHandler) RetryProxy.create(
+ SQLSecretManagerRetriableHandler.class, retryHandler, retryPolicy);
+ }
+
+ /**
+ * Executes a SQL command and raises retryable errors as
+ * {@link SQLSecretManagerRetriableException}s so they are recognized by the
+ * {@link RetryProxy}.
+ * @param command SQL command to execute
+ * @throws SQLException When SQL connection errors occur
+ */
+ @Override
+ public void execute(SQLCommandVoid command) throws SQLException {
+ try {
+ command.doCall();
+ } catch (SQLException e) {
+ LOG.warn("Failed to execute SQL command", e);
+ throw new SQLSecretManagerRetriableException(e);
+ }
+ }
+
+ /**
+ * Executes a SQL command and raises retryable errors as
+ * {@link SQLSecretManagerRetriableException}s so they are recognized by the
+ * {@link RetryProxy}.
+ * @param command SQL command to execute
+ * @throws SQLException When SQL connection errors occur
+ */
+ @Override
+ public <T> T execute(SQLCommand<T> command) throws SQLException {
+ try {
+ return command.doCall();
+ } catch (SQLException e) {
+ LOG.warn("Failed to execute SQL command", e);
+ throw new SQLSecretManagerRetriableException(e);
+ }
+ }
+
+ /**
+ * Class used to identify errors that can be retried.
+ */
+ static class SQLSecretManagerRetriableException extends SQLException {
+ SQLSecretManagerRetriableException(Throwable cause) {
+ super(cause);
+ }
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
new file mode 100644
index 00000000000..569a274042b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.router.security.token;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+public class TestSQLDelegationTokenSecretManagerImpl {
+ private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore";
+ private static final int TEST_MAX_RETRIES = 3;
+ private static Configuration conf;
+
+ @Before
+ public void init() throws SQLException {
+ createTestDBTables();
+ }
+
+ @After
+ public void cleanup() throws SQLException {
+ dropTestDBTables();
+ }
+
+ @BeforeClass
+ public static void initDatabase() throws SQLException {
+ DriverManager.getConnection(CONNECTION_URL + ";create=true");
+
+ conf = new Configuration();
+ conf.set(SQLConnectionFactory.CONNECTION_URL, CONNECTION_URL);
+ conf.set(SQLConnectionFactory.CONNECTION_USERNAME, "testuser");
+ conf.set(SQLConnectionFactory.CONNECTION_PASSWORD, "testpassword");
+ conf.set(SQLConnectionFactory.CONNECTION_DRIVER, "org.apache.derby.jdbc.EmbeddedDriver");
+ conf.setInt(SQLSecretManagerRetriableHandlerImpl.MAX_RETRIES, TEST_MAX_RETRIES);
+ conf.setInt(SQLSecretManagerRetriableHandlerImpl.RETRY_SLEEP_TIME_MS, 10);
+ }
+
+ @AfterClass
+ public static void cleanupDatabase() {
+ try {
+ DriverManager.getConnection(CONNECTION_URL + ";drop=true");
+ } catch (SQLException e) {
+ // SQLException expected when database is dropped
+ if (!e.getMessage().contains("dropped")) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Test
+ public void testSingleSecretManager() throws Exception {
+ DelegationTokenManager tokenManager = createTokenManager();
+ try {
+ Token<? extends AbstractDelegationTokenIdentifier> token =
+ tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ validateToken(tokenManager, token);
+ } finally {
+ stopTokenManager(tokenManager);
+ }
+ }
+
+ @Test
+ public void testMultipleSecretManagers() throws Exception {
+ DelegationTokenManager tokenManager1 = createTokenManager();
+ DelegationTokenManager tokenManager2 = createTokenManager();
+
+ try {
+ Token<? extends AbstractDelegationTokenIdentifier> token1 =
+ tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ Token<? extends AbstractDelegationTokenIdentifier> token2 =
+ tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
+
+ validateToken(tokenManager1, token2);
+ validateToken(tokenManager2, token1);
+ } finally {
+ stopTokenManager(tokenManager1);
+ stopTokenManager(tokenManager2);
+ }
+ }
+
+ @Test
+ public void testSequenceNumAllocation() throws Exception {
+ int tokensPerManager = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE * 5;
+ Set<Integer> sequenceNums1 = new HashSet<>();
+ Set<Integer> sequenceNums2 = new HashSet<>();
+ Set<Integer> sequenceNums3 = new HashSet<>();
+ Set<Integer> sequenceNums = new HashSet<>();
+ DelegationTokenManager tokenManager1 = createTokenManager();
+ DelegationTokenManager tokenManager2 = createTokenManager();
+ DelegationTokenManager tokenManager3 = createTokenManager();
+
+ try {
+ for (int i = 0; i < tokensPerManager; i++) {
+ allocateSequenceNum(tokenManager1, sequenceNums1);
+ allocateSequenceNum(tokenManager2, sequenceNums2);
+ allocateSequenceNum(tokenManager3, sequenceNums3);
+ sequenceNums.addAll(sequenceNums1);
+ sequenceNums.addAll(sequenceNums2);
+ sequenceNums.addAll(sequenceNums3);
+ }
+
+ Assert.assertEquals("Verify that all tokens were created with unique sequence numbers",
+ tokensPerManager * 3, sequenceNums.size());
+ Assert.assertEquals("Verify that tokenManager1 generated unique sequence numbers",
+ tokensPerManager, sequenceNums1.size());
+ Assert.assertEquals("Verify that tokenManager2 generated unique sequence number",
+ tokensPerManager, sequenceNums2.size());
+ Assert.assertEquals("Verify that tokenManager3 generated unique sequence numbers",
+ tokensPerManager, sequenceNums3.size());
+
+ // Validate sequence number batches allocated in order to each token manager
+ int batchSize = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE;
+ for (int seqNum = 1; seqNum < tokensPerManager;) {
+ // First batch allocated tokenManager1
+ for (int i = 0; i < batchSize; i++, seqNum++) {
+ Assert.assertTrue(sequenceNums1.contains(seqNum));
+ }
+ // Second batch allocated tokenManager2
+ for (int i = 0; i < batchSize; i++, seqNum++) {
+ Assert.assertTrue(sequenceNums2.contains(seqNum));
+ }
+ // Third batch allocated tokenManager3
+ for (int i = 0; i < batchSize; i++, seqNum++) {
+ Assert.assertTrue(sequenceNums3.contains(seqNum));
+ }
+ }
+
+ SQLDelegationTokenSecretManagerImpl secretManager =
+ (SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager();
+ Assert.assertEquals("Verify that the counter is set to the highest sequence number",
+ tokensPerManager * 3, secretManager.getDelegationTokenSeqNum());
+ } finally {
+ stopTokenManager(tokenManager1);
+ stopTokenManager(tokenManager2);
+ stopTokenManager(tokenManager3);
+ }
+ }
+
+ @Test
+ public void testSequenceNumRollover() throws Exception {
+ int tokenBatch = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE;
+ Set<Integer> sequenceNums = new HashSet<>();
+
+ DelegationTokenManager tokenManager = createTokenManager();
+
+ try {
+ SQLDelegationTokenSecretManagerImpl secretManager =
+ (SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager();
+ secretManager.setDelegationTokenSeqNum(Integer.MAX_VALUE - tokenBatch);
+
+ // Allocate sequence numbers before they are rolled over
+ for (int seqNum = Integer.MAX_VALUE - tokenBatch; seqNum < Integer.MAX_VALUE; seqNum++) {
+ allocateSequenceNum(tokenManager, sequenceNums);
+ Assert.assertTrue(sequenceNums.contains(seqNum + 1));
+ }
+
+ // Allocate sequence numbers after they are rolled over
+ for (int seqNum = 0; seqNum < tokenBatch; seqNum++) {
+ allocateSequenceNum(tokenManager, sequenceNums);
+ Assert.assertTrue(sequenceNums.contains(seqNum + 1));
+ }
+ } finally {
+ stopTokenManager(tokenManager);
+ }
+ }
+
+ @Test
+ public void testDelegationKeyAllocation() throws Exception {
+ DelegationTokenManager tokenManager1 = createTokenManager();
+
+ try {
+ SQLDelegationTokenSecretManagerImpl secretManager1 =
+ (SQLDelegationTokenSecretManagerImpl) tokenManager1.getDelegationTokenSecretManager();
+ // Prevent delegation keys to roll for the rest of the test to avoid race conditions
+ // between the keys generated and the active keys in the database.
+ ((TestDelegationTokenSecretManager) secretManager1).lockKeyRoll();
+ int keyId1 = secretManager1.getCurrentKeyId();
+
+ // Validate that latest key1 is assigned to tokenManager1 tokens
+ Token<? extends AbstractDelegationTokenIdentifier> token1 =
+ tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ validateKeyId(token1, keyId1);
+
+ DelegationTokenManager tokenManager2 = createTokenManager();
+ try {
+ SQLDelegationTokenSecretManagerImpl secretManager2 =
+ (SQLDelegationTokenSecretManagerImpl) tokenManager2.getDelegationTokenSecretManager();
+ // Prevent delegation keys to roll for the rest of the test
+ ((TestDelegationTokenSecretManager) secretManager2).lockKeyRoll();
+ int keyId2 = secretManager2.getCurrentKeyId();
+
+ Assert.assertNotEquals("Each secret manager has its own key", keyId1, keyId2);
+
+ // Validate that latest key2 is assigned to tokenManager2 tokens
+ Token<? extends AbstractDelegationTokenIdentifier> token2 =
+ tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ validateKeyId(token2, keyId2);
+
+ // Validate that key1 is still assigned to tokenManager1 tokens
+ token1 = tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ validateKeyId(token1, keyId1);
+
+ // Validate that key2 is still assigned to tokenManager2 tokens
+ token2 = tokenManager2.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ validateKeyId(token2, keyId2);
+ } finally {
+ stopTokenManager(tokenManager2);
+ }
+ } finally {
+ stopTokenManager(tokenManager1);
+ }
+ }
+
+ @Test
+ public void testHikariConfigs() {
+ HikariDataSourceConnectionFactory factory1 = new HikariDataSourceConnectionFactory(conf);
+ int defaultMaximumPoolSize = factory1.getDataSource().getMaximumPoolSize();
+ factory1.shutdown();
+
+ // Changing default maximumPoolSize
+ Configuration hikariConf = new Configuration(conf);
+ hikariConf.setInt(HikariDataSourceConnectionFactory.HIKARI_PROPS + "maximumPoolSize",
+ defaultMaximumPoolSize + 1);
+
+ // Verifying property is correctly set in datasource
+ HikariDataSourceConnectionFactory factory2 = new HikariDataSourceConnectionFactory(hikariConf);
+ Assert.assertEquals(factory2.getDataSource().getMaximumPoolSize(),
+ defaultMaximumPoolSize + 1);
+ factory2.shutdown();
+ }
+
+ @Test
+ public void testRetries() throws Exception {
+ DelegationTokenManager tokenManager = createTokenManager();
+ TestDelegationTokenSecretManager secretManager =
+ (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager();
+
+ try {
+ // Prevent delegation keys to roll for the rest of the test
+ secretManager.lockKeyRoll();
+
+ // Reset counter and expect a single request when inserting a token
+ TestRetryHandler.resetExecutionAttemptCounter();
+ tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertEquals(1, TestRetryHandler.getExecutionAttempts());
+
+ // Breaking database connections to cause retries
+ secretManager.setReadOnly(true);
+
+ // Reset counter and expect a multiple retries when failing to insert a token
+ TestRetryHandler.resetExecutionAttemptCounter();
+ tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ Assert.assertEquals(TEST_MAX_RETRIES + 1, TestRetryHandler.getExecutionAttempts());
+ } finally {
+ // Fix database connections
+ secretManager.setReadOnly(false);
+ stopTokenManager(tokenManager);
+ }
+ }
+
+ private DelegationTokenManager createTokenManager() {
+ DelegationTokenManager tokenManager = new DelegationTokenManager(new Configuration(), null);
+ tokenManager.setExternalDelegationTokenSecretManager(new TestDelegationTokenSecretManager());
+ return tokenManager;
+ }
+
+ private void allocateSequenceNum(DelegationTokenManager tokenManager, Set<Integer> sequenceNums)
+ throws IOException {
+ Token<? extends AbstractDelegationTokenIdentifier> token =
+ tokenManager.createToken(UserGroupInformation.getCurrentUser(), "foo");
+ AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
+ Assert.assertFalse("Verify sequence number is unique",
+ sequenceNums.contains(tokenIdentifier.getSequenceNumber()));
+
+ sequenceNums.add(tokenIdentifier.getSequenceNumber());
+ }
+
+ private void validateToken(DelegationTokenManager tokenManager,
+ Token<? extends AbstractDelegationTokenIdentifier> token)
+ throws Exception {
+ SQLDelegationTokenSecretManagerImpl secretManager =
+ (SQLDelegationTokenSecretManagerImpl) tokenManager.getDelegationTokenSecretManager();
+ AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
+
+ // Verify token using token manager
+ tokenManager.verifyToken(token);
+
+ byte[] tokenInfo1 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
+ tokenIdentifier.getBytes());
+ Assert.assertNotNull("Verify token exists in database", tokenInfo1);
+
+ // Renew token using token manager
+ tokenManager.renewToken(token, "foo");
+
+ byte[] tokenInfo2 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
+ tokenIdentifier.getBytes());
+ Assert.assertNotNull("Verify token exists in database", tokenInfo2);
+ Assert.assertFalse("Verify token has been updated in database",
+ Arrays.equals(tokenInfo1, tokenInfo2));
+
+ // Cancel token using token manager
+ tokenManager.cancelToken(token, "foo");
+ byte[] tokenInfo3 = secretManager.selectTokenInfo(tokenIdentifier.getSequenceNumber(),
+ tokenIdentifier.getBytes());
+ Assert.assertNull("Verify token was removed from database", tokenInfo3);
+ }
+
+ private void validateKeyId(Token<? extends AbstractDelegationTokenIdentifier> token,
+ int expectedKeyiD) throws IOException {
+ AbstractDelegationTokenIdentifier tokenIdentifier = token.decodeIdentifier();
+ Assert.assertEquals("Verify that keyId is assigned to token",
+ tokenIdentifier.getMasterKeyId(), expectedKeyiD);
+ }
+
+ private static Connection getTestDBConnection() {
+ try {
+ return DriverManager.getConnection(CONNECTION_URL);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void createTestDBTables() throws SQLException {
+ execute("CREATE TABLE Tokens (sequenceNum INT NOT NULL, "
+ + "tokenIdentifier VARCHAR (255) FOR BIT DATA NOT NULL, "
+ + "tokenInfo VARCHAR (255) FOR BIT DATA NOT NULL, "
+ + "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
+ + "PRIMARY KEY(sequenceNum))");
+ execute("CREATE TABLE DelegationKeys (keyId INT NOT NULL, "
+ + "delegationKey VARCHAR (255) FOR BIT DATA NOT NULL, "
+ + "modifiedTime TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, "
+ + "PRIMARY KEY(keyId))");
+ execute("CREATE TABLE LastSequenceNum (sequenceNum INT NOT NULL)");
+ execute("INSERT INTO LastSequenceNum VALUES (0)");
+ execute("CREATE TABLE LastDelegationKeyId (keyId INT NOT NULL)");
+ execute("INSERT INTO LastDelegationKeyId VALUES (0)");
+ }
+
+ private static void dropTestDBTables() throws SQLException {
+ execute("DROP TABLE Tokens");
+ execute("DROP TABLE DelegationKeys");
+ execute("DROP TABLE LastSequenceNum");
+ execute("DROP TABLE LastDelegationKeyId");
+ }
+
+ private static void execute(String statement) throws SQLException {
+ try (Connection connection = getTestDBConnection()) {
+ connection.createStatement().execute(statement);
+ }
+ }
+
+ private void stopTokenManager(DelegationTokenManager tokenManager) {
+ TestDelegationTokenSecretManager secretManager =
+ (TestDelegationTokenSecretManager) tokenManager.getDelegationTokenSecretManager();
+ // Release any locks on tables
+ secretManager.unlockKeyRoll();
+ // Stop threads to close database connections
+ secretManager.stopThreads();
+ }
+
+ static class TestDelegationTokenSecretManager extends SQLDelegationTokenSecretManagerImpl {
+ private ReentrantLock keyRollLock;
+
+ private synchronized ReentrantLock getKeyRollLock() {
+ if (keyRollLock == null) {
+ keyRollLock = new ReentrantLock();
+ }
+ return keyRollLock;
+ }
+
+ TestDelegationTokenSecretManager() {
+ super(conf, new TestConnectionFactory(conf),
+ SQLSecretManagerRetriableHandlerImpl.getInstance(conf, new TestRetryHandler()));
+ }
+
+ // Tests can call this method to prevent delegation keys from
+ // being rolled in the middle of a test to prevent race conditions
+ public void lockKeyRoll() {
+ getKeyRollLock().lock();
+ }
+
+ public void unlockKeyRoll() {
+ if (getKeyRollLock().isHeldByCurrentThread()) {
+ getKeyRollLock().unlock();
+ }
+ }
+
+ @Override
+ protected void rollMasterKey() throws IOException {
+ try {
+ lockKeyRoll();
+ super.rollMasterKey();
+ } finally {
+ unlockKeyRoll();
+ }
+ }
+
+ public void setReadOnly(boolean readOnly) {
+ ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly;
+ }
+ }
+
+ static class TestConnectionFactory extends HikariDataSourceConnectionFactory {
+ private boolean readOnly = false;
+ TestConnectionFactory(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public Connection getConnection() throws SQLException {
+ Connection connection = super.getConnection();
+ // Change to default schema as derby driver looks for user schema
+ connection.setSchema("APP");
+ connection.setReadOnly(readOnly);
+ return connection;
+ }
+ }
+
+ static class TestRetryHandler extends SQLSecretManagerRetriableHandlerImpl {
+ // Tracks the amount of times that a SQL command is executed, regardless of
+ // whether it completed successfully or not.
+ private static AtomicInteger executionAttemptCounter = new AtomicInteger();
+
+ static void resetExecutionAttemptCounter() {
+ executionAttemptCounter = new AtomicInteger();
+ }
+
+ static int getExecutionAttempts() {
+ return executionAttemptCounter.get();
+ }
+
+ @Override
+ public void execute(SQLCommandVoid command) throws SQLException {
+ executionAttemptCounter.incrementAndGet();
+ super.execute(command);
+ }
+ }
+}
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 9e12edaf55a..550c716d485 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -129,6 +129,8 @@
<jcache.version>1.0-alpha-1</jcache.version>
<ehcache.version>3.3.1</ehcache.version>
<hikari.version>4.0.3</hikari.version>
+ <derby.version>10.10.2.0</derby.version>
+ <mysql-connector-java.version>8.0.29</mysql-connector-java.version>
<mssql.version>6.2.1.jre7</mssql.version>
<okhttp3.version>4.10.0</okhttp3.version>
<okio.version>3.2.0</okio.version>
@@ -1809,6 +1811,16 @@
<artifactId>HikariCP</artifactId>
<version>${hikari.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.derby</groupId>
+ <artifactId>derby</artifactId>
+ <version>${derby.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql-connector-java.version}</version>
+ </dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org