You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2023/02/09 02:10:14 UTC

[ozone] branch master updated: HDDS-7815. Extract S3 secret layer (#4196)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bb74438fa HDDS-7815. Extract S3 secret layer (#4196)
1bb74438fa is described below

commit 1bb74438fad30c96b699bcbb7c27b695a0b7a072
Author: Mikhail <Po...@users.noreply.github.com>
AuthorDate: Thu Feb 9 05:10:09 2023 +0300

    HDDS-7815. Extract S3 secret layer (#4196)
---
 .../java/org/apache/hadoop/ozone/om/S3Batcher.java |  46 +++++++++
 .../org/apache/hadoop/ozone/om/S3SecretCache.java  |  39 ++++++++
 .../apache/hadoop/ozone/om/S3SecretFunction.java   |  28 ++++++
 .../hadoop/ozone/om/S3SecretLockedManager.java     | 100 +++++++++++++++++++
 .../apache/hadoop/ozone/om/S3SecretManager.java    |  81 ++++++++++++++-
 .../org/apache/hadoop/ozone/om/S3SecretStore.java  |  56 +++++++++++
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java |  16 +--
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   9 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  59 ++++++++++-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  13 ++-
 .../hadoop/ozone/om/S3SecretManagerImpl.java       |  93 +++++++++--------
 .../om/request/s3/security/OMSetSecretRequest.java |  85 +++++++---------
 .../om/request/s3/security/S3GetSecretRequest.java | 110 ++++++++++-----------
 .../request/s3/security/S3RevokeSecretRequest.java |  45 ++++-----
 .../tenant/OMTenantAssignUserAccessIdRequest.java  |  39 +++-----
 .../tenant/OMTenantRevokeUserAccessIdRequest.java  |  20 +---
 .../response/s3/security/OMSetSecretResponse.java  |  12 ++-
 .../response/s3/security/S3GetSecretResponse.java  |  20 +++-
 .../s3/security/S3RevokeSecretResponse.java        |  14 ++-
 .../tenant/OMTenantAssignUserAccessIdResponse.java |  21 ++--
 .../tenant/OMTenantRevokeUserAccessIdResponse.java |  13 ++-
 .../OzoneDelegationTokenSecretManager.java         |  13 ++-
 .../s3/security/TestS3GetSecretRequest.java        |  13 ++-
 .../hadoop/ozone/security/S3SecretStoreMap.java    |  57 +++++++++++
 .../TestOzoneDelegationTokenSecretManager.java     |  91 ++++++++---------
 25 files changed, 768 insertions(+), 325 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3Batcher.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3Batcher.java
new file mode 100644
index 0000000000..c2cc341a46
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3Batcher.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
+
+import java.io.IOException;
+
+/**
+ * Batcher for write and read operations. Depend on provide batch operator.
+ */
+public interface S3Batcher {
+  /**
+   * Add with provided batch.
+   * @param batchOperator instance of batch operator.
+   * @param id entity id.
+   * @param s3SecretValue s3 secret value.
+   * @throws IOException in case when batch operation failed.
+   */
+  void addWithBatch(AutoCloseable batchOperator, String id,
+                    S3SecretValue s3SecretValue)
+      throws IOException;
+
+  /**
+   * Delete with provided batch.
+   * @param batchOperator instance of batch operator.
+   * @param id entity id.
+   * @throws IOException in case when batch operation failed.
+   */
+  void deleteWithBatch(AutoCloseable batchOperator, String id)
+      throws IOException;
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretCache.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretCache.java
new file mode 100644
index 0000000000..a44250c57f
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretCache.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
+
+/**
+ * Cache layer of S3 secrets.
+ */
+public interface S3SecretCache {
+  /**
+   * Put secret value to cache.
+   * @param id secret value identifier.
+   * @param secretValue secret value.
+   * @param txId lifetime identifier.
+   */
+  void put(String id, S3SecretValue secretValue, long txId);
+
+  /**
+   * Invalidate secret value with provided secret identifier.
+   * @param id secret identifier.
+   * @param txId lifetime identifier.
+   */
+  void invalidate(String id, long txId);
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretFunction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretFunction.java
new file mode 100644
index 0000000000..8bd0d03e94
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretFunction.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import java.io.IOException;
+
+/**
+ * Functional interface for s3 secret locked actions.
+ * @param <T>
+ */
+public interface S3SecretFunction<T> {
+
+  T accept(S3SecretManager s3SecretManager) throws IOException;
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretLockedManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretLockedManager.java
new file mode 100644
index 0000000000..d5349a00bc
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretLockedManager.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
+
+/**
+ * Wrapper with lock logic of {@link S3SecretManager}.
+ */
+public class S3SecretLockedManager implements S3SecretManager {
+  private final S3SecretManager secretManager;
+  private final IOzoneManagerLock lock;
+
+  public S3SecretLockedManager(S3SecretManager secretManager,
+                               IOzoneManagerLock lock) {
+    this.secretManager = secretManager;
+    this.lock = lock;
+  }
+
+  @Override
+  public S3SecretValue getSecret(String kerberosID) throws IOException {
+    lock.acquireWriteLock(S3_SECRET_LOCK, kerberosID);
+    try {
+      return secretManager.getSecret(kerberosID);
+    } finally {
+      lock.releaseWriteLock(S3_SECRET_LOCK, kerberosID);
+    }
+  }
+
+  @Override
+  public String getSecretString(String awsAccessKey) throws IOException {
+    lock.acquireReadLock(S3_SECRET_LOCK, awsAccessKey);
+    try {
+      return secretManager.getSecretString(awsAccessKey);
+    } finally {
+      lock.releaseReadLock(S3_SECRET_LOCK, awsAccessKey);
+    }
+  }
+
+  @Override
+  public void storeSecret(String kerberosId, S3SecretValue secretValue)
+      throws IOException {
+    lock.acquireWriteLock(S3_SECRET_LOCK, kerberosId);
+    try {
+      secretManager.storeSecret(kerberosId, secretValue);
+    } finally {
+      lock.releaseWriteLock(S3_SECRET_LOCK, kerberosId);
+    }
+  }
+
+  @Override
+  public void revokeSecret(String kerberosId) throws IOException {
+    lock.acquireWriteLock(S3_SECRET_LOCK, kerberosId);
+    try {
+      secretManager.revokeSecret(kerberosId);
+    } finally {
+      lock.releaseWriteLock(S3_SECRET_LOCK, kerberosId);
+    }
+  }
+
+  @Override
+  public <T> T doUnderLock(String lockId, S3SecretFunction<T> action)
+      throws IOException {
+    lock.acquireWriteLock(S3_SECRET_LOCK, lockId);
+    try {
+      return action.accept(secretManager);
+    } finally {
+      lock.releaseWriteLock(S3_SECRET_LOCK, lockId);
+    }
+  }
+
+  @Override
+  public S3Batcher batcher() {
+    return secretManager.batcher();
+  }
+
+  @Override
+  public S3SecretCache cache() {
+    return secretManager.cache();
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManager.java
index 3ca8cbbb21..6685d24ed4 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManager.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManager.java
@@ -19,18 +19,93 @@
 package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+
 /**
  * Interface to manager s3 secret.
  */
 public interface S3SecretManager {
+  Logger LOG = LoggerFactory.getLogger(S3SecretManager.class);
 
-  S3SecretValue getS3Secret(String kerberosID) throws IOException;
+  /**
+   * API to get s3 secret for given kerberos identifier.
+   * @param kerberosID kerberos principal.
+   * @return associated s3 secret or null if secret not existed.
+   * @throws IOException
+   */
+  S3SecretValue getSecret(String kerberosID) throws IOException;
 
   /**
    * API to get s3 secret for given awsAccessKey.
-   * @param awsAccessKey
+   * @param awsAccessKey aws access key.
    * */
-  String getS3UserSecretString(String awsAccessKey) throws IOException;
+  String getSecretString(String awsAccessKey) throws IOException;
+
+  /**
+   * Store provided s3 secret and associate it with kerberos principal.
+   * @param kerberosId kerberos principal.
+   * @param secretValue s3 secret value.
+   * @throws IOException in case when secret storing failed.
+   */
+  void storeSecret(String kerberosId, S3SecretValue secretValue)
+      throws IOException;
+
+  /**
+   * Revoke s3 secret which associated with provided kerberos principal.
+   * @param kerberosId kerberos principal.
+   * @throws IOException in case when revoke failed.
+   */
+  void revokeSecret(String kerberosId) throws IOException;
+
+  /**
+   * Apply provided action under write lock.
+   * @param lockId lock identifier.
+   * @param action custom action.
+   * @param <T> type of action result.
+   * @return action result.
+   * @throws IOException in case when action failed.
+   */
+  <T> T doUnderLock(String lockId, S3SecretFunction<T> action)
+      throws IOException;
+
+  /**
+   * Default implementation of secret check method.
+   * @param kerberosId kerberos principal.
+   * @return true if exist associated s3 secret for given {@param kerberosId},
+   * false if not.
+   */
+  default boolean hasS3Secret(String kerberosId) throws IOException {
+    return getSecret(kerberosId) != null;
+  }
+
+  S3Batcher batcher();
+
+  default boolean isBatchSupported() {
+    return batcher() != null;
+  }
+
+  /**
+   * Direct cache accessor.
+   * @return s3 secret cache.
+   */
+  S3SecretCache cache();
+
+  default void updateCache(String accessId, S3SecretValue secret, long txId) {
+    S3SecretCache cache = cache();
+    if (cache != null) {
+      cache.put(accessId, secret, txId);
+    }
+  }
+
+  default void invalidateCacheEntry(String id, long txId) {
+    S3SecretCache cache = cache();
+    if (cache != null) {
+      cache.invalidate(id, txId);
+    }
+  }
+
+
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretStore.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretStore.java
new file mode 100644
index 0000000000..5d54d7a5c6
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretStore.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
+
+import java.io.IOException;
+
+/**
+ * S3 secret store interface.
+ */
+public interface S3SecretStore {
+
+  /**
+   * Store provided s3 secret with associated kerberos ID.
+   * @param kerberosId kerberos ID.
+   * @param secret s3 secret.
+   * @throws IOException in case when secret storing failed.
+   */
+  void storeSecret(String kerberosId, S3SecretValue secret)
+      throws IOException;
+
+  /**
+   * Get s3 secret associated with provided kerberos ID.
+   * @param kerberosID kerberos ID
+   * @return s3 secret value or null if s3 secret not founded.
+   * @throws IOException in case when secret reading failed.
+   */
+  S3SecretValue getSecret(String kerberosID) throws IOException;
+
+  /**
+   * Revoke s3 secret associated with provided kerberos ID.
+   * @param kerberosId kerberos ID.
+   * @throws IOException in case when secret revoking failed.
+   */
+  void revokeSecret(String kerberosId) throws IOException;
+
+  /**
+   * @return s3 batcher instance, null if batch operation doesn't support.
+   */
+  S3Batcher batcher();
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
index 8190b88b38..f8070eb68d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.S3SecretManager;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -296,9 +297,12 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
 
     String accessKey = UserGroupInformation.getCurrentUser().getUserName();
 
+    S3SecretManager s3SecretManager = cluster.getOzoneManager()
+        .getS3SecretManager();
+
     // Add secret to S3Secret table.
-    cluster.getOzoneManager().getMetadataManager().getS3SecretTable().put(
-        accessKey, new S3SecretValue(accessKey, secret));
+    s3SecretManager.storeSecret(accessKey,
+        new S3SecretValue(accessKey, secret));
 
     OMRequest writeRequest = OMRequest.newBuilder()
         .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
@@ -345,16 +349,16 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
     Assert.assertEquals(accessKey, volumeInfo.getAdminName());
     Assert.assertEquals(accessKey, volumeInfo.getOwnerName());
 
-    // Override secret to S3Secret table with some dummy value
-    cluster.getOzoneManager().getMetadataManager().getS3SecretTable().put(
-        accessKey, new S3SecretValue(accessKey, "dummy"));
+    // Override secret to S3Secret store with some dummy value
+    s3SecretManager
+        .storeSecret(accessKey, new S3SecretValue(accessKey, "dummy"));
 
     // Write request with invalid credentials.
     omResponse = cluster.getOzoneManager().getOmServerProtocol()
         .submitRequest(null, writeRequest);
     Assert.assertTrue(omResponse.getStatus() == Status.INVALID_TOKEN);
 
-  // Read request with invalid credentials.
+    // Read request with invalid credentials.
     omResponse = cluster.getOzoneManager().getOmServerProtocol()
         .submitRequest(null, readRequest);
     Assert.assertTrue(omResponse.getStatus() == Status.INVALID_TOKEN);
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 539a6daf6f..8ec576f8ad 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -39,7 +39,6 @@ import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
@@ -366,12 +365,6 @@ public interface OMMetadataManager extends DBStoreHAManager {
    */
   Table<String, OmMultipartKeyInfo> getMultipartInfoTable();
 
-  /**
-   * Gets the S3 Secrets table.
-   * @return Table
-   */
-  Table<String, S3SecretValue> getS3SecretTable();
-
   Table<String, TransactionInfo> getTransactionInfoTable();
 
   Table<String, OmDBAccessIdInfo> getTenantAccessIdTable();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 127e3ceb12..f079030e64 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -31,9 +31,11 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import com.google.common.base.Optional;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.TableCacheMetrics;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 import org.apache.hadoop.hdds.utils.db.RocksDBConfiguration;
@@ -112,7 +114,8 @@ import org.slf4j.LoggerFactory;
 /**
  * Ozone metadata manager interface.
  */
-public class OmMetadataManagerImpl implements OMMetadataManager {
+public class OmMetadataManagerImpl implements OMMetadataManager,
+    S3SecretStore, S3SecretCache {
   private static final Logger LOG =
       LoggerFactory.getLogger(OmMetadataManagerImpl.class);
 
@@ -246,7 +249,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   private Table deletedTable;
   private Table openKeyTable;
   private Table<String, OmMultipartKeyInfo> multipartInfoTable;
-  private Table s3SecretTable;
+  private Table<String, S3SecretValue> s3SecretTable;
   private Table dTokenTable;
   private Table prefixTable;
   private Table dirTable;
@@ -1511,8 +1514,54 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   }
 
   @Override
-  public Table<String, S3SecretValue> getS3SecretTable() {
-    return s3SecretTable;
+  public void storeSecret(String kerberosId, S3SecretValue secret)
+      throws IOException {
+    s3SecretTable.put(kerberosId, secret);
+  }
+
+  @Override
+  public S3SecretValue getSecret(String kerberosID) throws IOException {
+    return s3SecretTable.get(kerberosID);
+  }
+
+  @Override
+  public void revokeSecret(String kerberosId) throws IOException {
+    s3SecretTable.delete(kerberosId);
+  }
+
+  @Override
+  public void put(String kerberosId, S3SecretValue secretValue, long txId) {
+    s3SecretTable.addCacheEntry(new CacheKey<>(kerberosId),
+        new CacheValue<>(Optional.of(secretValue), txId));
+  }
+
+  @Override
+  public void invalidate(String id, long txId) {
+    s3SecretTable.addCacheEntry(new CacheKey<>(id),
+        new CacheValue<>(Optional.absent(), txId));
+  }
+
+  @Override
+  public S3Batcher batcher() {
+    return new S3Batcher() {
+      @Override
+      public void addWithBatch(AutoCloseable batchOperator,
+                               String id, S3SecretValue s3SecretValue)
+          throws IOException {
+        if (batchOperator instanceof BatchOperation) {
+          s3SecretTable.putWithBatch((BatchOperation) batchOperator,
+              id, s3SecretValue);
+        }
+      }
+
+      @Override
+      public void deleteWithBatch(AutoCloseable batchOperator, String id)
+          throws IOException {
+        if (batchOperator instanceof BatchOperation) {
+          s3SecretTable.deleteWithBatch((BatchOperation) batchOperator, id);
+        }
+      }
+    };
   }
 
   @Override
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 67803786df..123c550aca 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -736,7 +736,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   private void instantiateServices(boolean withNewSnapshot) throws IOException {
 
-    metadataManager = new OmMetadataManagerImpl(configuration);
+    OmMetadataManagerImpl metadataManagerImpl =
+        new OmMetadataManagerImpl(configuration);
+    this.metadataManager = metadataManagerImpl;
     LOG.info("S3 Multi-Tenancy is {}",
         isS3MultiTenancyEnabled ? "enabled" : "disabled");
     if (isS3MultiTenancyEnabled) {
@@ -745,8 +747,11 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     }
     volumeManager = new VolumeManagerImpl(metadataManager);
     bucketManager = new BucketManagerImpl(metadataManager);
+    s3SecretManager = new S3SecretLockedManager(
+        new S3SecretManagerImpl(metadataManagerImpl, metadataManagerImpl),
+        metadataManager.getLock()
+    );
     if (secConfig.isSecurityEnabled() || testSecureOmFlag) {
-      s3SecretManager = new S3SecretManagerImpl(configuration, metadataManager);
       delegationTokenMgr = createDelegationTokenSecretManager(configuration);
     }
 
@@ -1482,6 +1487,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return metadataManager;
   }
 
+  public S3SecretManager getS3SecretManager() {
+    return s3SecretManager;
+  }
+
   /**
    * Get snapshot manager.
    *
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
index d9e7384677..79c46f5e42 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
@@ -19,9 +19,6 @@
 package org.apache.hadoop.ozone.om;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.hdds.security.OzoneSecurityException;
 import org.apache.logging.log4j.util.Strings;
@@ -30,7 +27,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
-import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
 import static org.apache.hadoop.hdds.security.OzoneSecurityException.ResultCodes.S3_SECRET_NOT_FOUND;
 
 /**
@@ -39,71 +35,72 @@ import static org.apache.hadoop.hdds.security.OzoneSecurityException.ResultCodes
 public class S3SecretManagerImpl implements S3SecretManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(S3SecretManagerImpl.class);
-  /**
-   * OMMetadataManager is used for accessing OM MetadataDB and ReadWriteLock.
-   */
-  private final OMMetadataManager omMetadataManager;
-  private final OzoneConfiguration configuration;
+
+  private final S3SecretStore s3SecretStore;
+  private final S3SecretCache s3SecretCache;
 
   /**
    * Constructs S3SecretManager.
-   *
-   * @param omMetadataManager
+   * @param s3SecretStore s3 secret store.
+   * @param s3SecretCache s3 secret cache.
    */
-  public S3SecretManagerImpl(OzoneConfiguration configuration,
-      OMMetadataManager omMetadataManager) {
-    this.configuration = configuration;
-    this.omMetadataManager = omMetadataManager;
+  public S3SecretManagerImpl(S3SecretStore s3SecretStore,
+                             S3SecretCache s3SecretCache) {
+    this.s3SecretStore = s3SecretStore;
+    this.s3SecretCache = s3SecretCache;
   }
 
   @Override
-  public S3SecretValue getS3Secret(String kerberosID) throws IOException {
+  public S3SecretValue getSecret(String kerberosID) throws IOException {
     Preconditions.checkArgument(Strings.isNotBlank(kerberosID),
         "kerberosID cannot be null or empty.");
-    S3SecretValue result = null;
-    omMetadataManager.getLock().acquireWriteLock(S3_SECRET_LOCK, kerberosID);
-    try {
-      S3SecretValue s3Secret =
-          omMetadataManager.getS3SecretTable().get(kerberosID);
-      if (s3Secret == null) {
-        byte[] secret = OmUtils.getSHADigest();
-        result = new S3SecretValue(kerberosID, DigestUtils.sha256Hex(secret));
-        omMetadataManager.getS3SecretTable().put(kerberosID, result);
-      } else {
-        return s3Secret;
-      }
-    } finally {
-      omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, kerberosID);
-    }
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Secret for accessKey:{}, proto:{}", kerberosID, result);
-    }
-    return result;
+    return s3SecretStore.getSecret(kerberosID);
   }
 
   @Override
-  public String getS3UserSecretString(String awsAccessKey)
+  public String getSecretString(String awsAccessKey)
       throws IOException {
     Preconditions.checkArgument(Strings.isNotBlank(awsAccessKey),
         "awsAccessKeyId cannot be null or empty.");
     LOG.trace("Get secret for awsAccessKey:{}", awsAccessKey);
 
-    S3SecretValue s3Secret;
-    omMetadataManager.getLock().acquireReadLock(S3_SECRET_LOCK, awsAccessKey);
-    try {
-      s3Secret = omMetadataManager.getS3SecretTable().get(awsAccessKey);
-      if (s3Secret == null) {
-        throw new OzoneSecurityException("S3 secret not found for " +
-            "awsAccessKeyId " + awsAccessKey, S3_SECRET_NOT_FOUND);
-      }
-    } finally {
-      omMetadataManager.getLock().releaseReadLock(S3_SECRET_LOCK, awsAccessKey);
+    S3SecretValue s3Secret = s3SecretStore.getSecret(awsAccessKey);
+    if (s3Secret == null) {
+      throw new OzoneSecurityException("S3 secret not found for " +
+          "awsAccessKeyId " + awsAccessKey, S3_SECRET_NOT_FOUND);
     }
 
     return s3Secret.getAwsSecret();
   }
 
-  public OMMetadataManager getOmMetadataManager() {
-    return omMetadataManager;
+  @Override
+  public void storeSecret(String kerberosId, S3SecretValue secretValue)
+      throws IOException {
+    s3SecretStore.storeSecret(kerberosId, secretValue);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Secret for accessKey:{} stored", kerberosId);
+    }
+  }
+
+  @Override
+  public void revokeSecret(String kerberosId) throws IOException {
+    s3SecretStore.revokeSecret(kerberosId);
+  }
+
+  @Override
+  public <T> T doUnderLock(String lockId, S3SecretFunction<T> action)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        "Lock on locked secret manager is not supported.");
+  }
+
+  @Override
+  public S3SecretCache cache() {
+    return s3SecretCache;
+  }
+
+  @Override
+  public S3Batcher batcher() {
+    return s3SecretStore.batcher();
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
index 81abd8df21..704d302208 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/OMSetSecretRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,10 +18,7 @@
 
 package org.apache.hadoop.ozone.om.request.s3.security;
 
-import com.google.common.base.Optional;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.OMAction;
@@ -47,8 +44,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
-
 /**
  * Handles SetSecret request.
  */
@@ -77,7 +72,7 @@ public class OMSetSecretRequest extends OMClientRequest {
 
     if (accessIdInfo == null) {
       // Check (old) S3SecretTable
-      if (omMetadataManager.getS3SecretTable().get(accessId) == null) {
+      if (!ozoneManager.getS3SecretManager().hasS3Secret(accessId)) {
         throw new OMException("accessId '" + accessId + "' not found.",
             OMException.ResultCodes.ACCESS_ID_NOT_FOUND);
       }
@@ -87,7 +82,7 @@ public class OMSetSecretRequest extends OMClientRequest {
     final String secretKey = request.getSecretKey();
     if (StringUtils.isEmpty(secretKey)) {
       throw new OMException("Secret key should not be empty",
-              OMException.ResultCodes.INVALID_REQUEST);
+          OMException.ResultCodes.INVALID_REQUEST);
     }
 
     if (secretKey.length() < OzoneConsts.S3_SECRET_KEY_MIN_LENGTH) {
@@ -106,53 +101,49 @@ public class OMSetSecretRequest extends OMClientRequest {
 
   @Override
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
-      long transactionLogIndex,
-      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
-
+         long transactionLogIndex,
+         OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
     OMClientResponse omClientResponse = null;
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
-    boolean acquiredLock = false;
     IOException exception = null;
-    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
     final SetS3SecretRequest request = getOmRequest().getSetS3SecretRequest();
     final String accessId = request.getAccessId();
     final String secretKey = request.getSecretKey();
 
     try {
-      acquiredLock = omMetadataManager.getLock().acquireWriteLock(
-          S3_SECRET_LOCK, accessId);
-
-      // Intentionally set to final so they can only be set once.
-      final S3SecretValue newS3SecretValue;
-
-      // Update legacy S3SecretTable, if the accessId entry exists
-      if (omMetadataManager.getS3SecretTable().get(accessId) != null) {
-        // accessId found in S3SecretTable. Update S3SecretTable
-        LOG.debug("Updating S3SecretTable cache entry");
-        // Update S3SecretTable cache entry in this case
-        newS3SecretValue = new S3SecretValue(accessId, secretKey);
-
-        omMetadataManager.getS3SecretTable().addCacheEntry(
-            new CacheKey<>(accessId),
-            new CacheValue<>(Optional.of(newS3SecretValue),
-                transactionLogIndex));
-      } else {
-        // If S3SecretTable is not updated, throw ACCESS_ID_NOT_FOUND exception.
-        throw new OMException("accessId '" + accessId + "' not found.",
-            OMException.ResultCodes.ACCESS_ID_NOT_FOUND);
-      }
-
-      // Compose response
-      final SetS3SecretResponse.Builder setSecretResponse =
-          SetS3SecretResponse.newBuilder()
-              .setAccessId(accessId)
-              .setSecretKey(secretKey);
-
-      omClientResponse = new OMSetSecretResponse(accessId, newS3SecretValue,
-          omResponse.setSetS3SecretResponse(setSecretResponse).build());
-
+      omClientResponse = ozoneManager.getS3SecretManager()
+          .doUnderLock(accessId, s3SecretManager -> {
+            // Intentionally set to final so they can only be set once.
+            final S3SecretValue newS3SecretValue;
+
+            // Update legacy S3SecretTable, if the accessId entry exists
+            if (s3SecretManager.hasS3Secret(accessId)) {
+              // accessId found in S3SecretTable. Update S3SecretTable
+              LOG.debug("Updating S3SecretTable cache entry");
+              // Update S3SecretTable cache entry in this case
+              newS3SecretValue = new S3SecretValue(accessId, secretKey);
+
+              s3SecretManager
+                  .updateCache(accessId, newS3SecretValue, transactionLogIndex);
+            } else {
+              // If S3SecretTable is not updated,
+              // throw ACCESS_ID_NOT_FOUND exception.
+              throw new OMException("accessId '" + accessId + "' not found.",
+                  OMException.ResultCodes.ACCESS_ID_NOT_FOUND);
+            }
+
+            // Compose response
+            final SetS3SecretResponse.Builder setSecretResponse =
+                SetS3SecretResponse.newBuilder()
+                    .setAccessId(accessId)
+                    .setSecretKey(secretKey);
+
+            return new OMSetSecretResponse(accessId, newS3SecretValue,
+                s3SecretManager,
+                omResponse.setSetS3SecretResponse(setSecretResponse).build());
+          });
     } catch (IOException ex) {
       exception = ex;
       omClientResponse = new OMSetSecretResponse(
@@ -160,10 +151,6 @@ public class OMSetSecretRequest extends OMClientRequest {
     } finally {
       addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
           ozoneManagerDoubleBufferHelper);
-      if (acquiredLock) {
-        omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK,
-            accessId);
-      }
     }
 
     final Map<String, String> auditMap = new HashMap<>();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
index fe595ea9ec..f083ba0d93 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3GetSecretRequest.java
@@ -21,8 +21,8 @@ package org.apache.hadoop.ozone.om.request.s3.security;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.base.Optional;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.slf4j.Logger;
@@ -32,7 +32,6 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.OMAction;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -47,10 +46,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UpdateGetS3SecretRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3Secret;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
-
-import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
 
 /**
  * Handles GetS3Secret request.
@@ -136,9 +131,7 @@ public class S3GetSecretRequest extends OMClientRequest {
     OMClientResponse omClientResponse = null;
     OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
         getOmRequest());
-    boolean acquiredLock = false;
     IOException exception = null;
-    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
     final GetS3SecretRequest getS3SecretRequest =
             getOmRequest().getGetS3SecretRequest();
@@ -146,72 +139,71 @@ public class S3GetSecretRequest extends OMClientRequest {
     final boolean createIfNotExist = getS3SecretRequest.getCreateIfNotExist();
     // See Note 1 above
     final String accessId = getS3SecretRequest.getKerberosID();
-    String awsSecret = null;
+    AtomicReference<String> awsSecret = new AtomicReference<>();
     if (createIfNotExist) {
       final UpdateGetS3SecretRequest updateGetS3SecretRequest =
               getOmRequest().getUpdateGetS3SecretRequest();
-      awsSecret = updateGetS3SecretRequest.getAwsSecret();
+      awsSecret.set(updateGetS3SecretRequest.getAwsSecret());
       assert (accessId.equals(updateGetS3SecretRequest.getKerberosID()));
     }
 
     try {
-      acquiredLock = omMetadataManager.getLock()
-          .acquireWriteLock(S3_SECRET_LOCK, accessId);
-
-      final S3SecretValue assignS3SecretValue;
-      final S3SecretValue s3SecretValue =
-          omMetadataManager.getS3SecretTable().get(accessId);
-
-      if (s3SecretValue == null) {
-        // Not found in S3SecretTable.
-        if (createIfNotExist) {
-          // Add new entry in this case
-          assignS3SecretValue = new S3SecretValue(accessId, awsSecret);
-          // Add cache entry first.
-          omMetadataManager.getS3SecretTable().addCacheEntry(
-                  new CacheKey<>(accessId),
-                  new CacheValue<>(Optional.of(assignS3SecretValue),
-                          transactionLogIndex));
-        } else {
-          assignS3SecretValue = null;
-        }
-      } else {
-        // Found in S3SecretTable.
-        awsSecret = s3SecretValue.getAwsSecret();
-        assignS3SecretValue = null;
-      }
-
-      // Throw ACCESS_ID_NOT_FOUND to the client if accessId doesn't exist
-      //  when createIfNotExist is false.
-      if (awsSecret == null) {
-        assert (!createIfNotExist);
-        throw new OMException("accessId '" + accessId + "' doesn't exist",
-                OMException.ResultCodes.ACCESS_ID_NOT_FOUND);
-      }
-
-      // Compose response
-      final GetS3SecretResponse.Builder getS3SecretResponse =
-              GetS3SecretResponse.newBuilder().setS3Secret(
-                      S3Secret.newBuilder()
-                              .setAwsSecret(awsSecret)
-                              .setKerberosID(accessId)  // See Note 1 above
-              );
-      // If entry exists or createIfNotExist is false, assignS3SecretValue
-      // will be null, so we won't write or overwrite the entry.
-      omClientResponse = new S3GetSecretResponse(assignS3SecretValue,
-              omResponse.setGetS3SecretResponse(getS3SecretResponse).build());
+      omClientResponse = ozoneManager.getS3SecretManager()
+          .doUnderLock(accessId, s3SecretManager -> {
+            S3SecretValue assignS3SecretValue;
+            S3SecretValue s3SecretValue =
+                s3SecretManager.getSecret(accessId);
+
+            if (s3SecretValue == null) {
+              // Not found in S3SecretTable.
+              if (createIfNotExist) {
+                // Add new entry in this case
+                assignS3SecretValue =
+                    new S3SecretValue(accessId, awsSecret.get());
+                // Add cache entry first.
+                s3SecretManager.updateCache(accessId,
+                    assignS3SecretValue,
+                    transactionLogIndex);
+              } else {
+                assignS3SecretValue = null;
+              }
+            } else {
+              // Found in S3SecretTable.
+              awsSecret.set(s3SecretValue.getAwsSecret());
+              assignS3SecretValue = null;
+            }
+
+            // Throw ACCESS_ID_NOT_FOUND to the client if accessId doesn't exist
+            //  when createIfNotExist is false.
+            if (awsSecret.get() == null) {
+              assert (!createIfNotExist);
+              throw new OMException("accessId '" + accessId + "' doesn't exist",
+                  OMException.ResultCodes.ACCESS_ID_NOT_FOUND);
+            }
+
+            // Compose response
+            final GetS3SecretResponse.Builder getS3SecretResponse =
+                GetS3SecretResponse.newBuilder().setS3Secret(
+                    S3Secret.newBuilder()
+                        .setAwsSecret(awsSecret.get())
+                        .setKerberosID(accessId)  // See Note 1 above
+                );
+            // If entry exists or createIfNotExist is false, assignS3SecretValue
+            // will be null, so we won't write or overwrite the entry.
+            return new S3GetSecretResponse(assignS3SecretValue,
+                s3SecretManager,
+                omResponse.setGetS3SecretResponse(getS3SecretResponse).build());
+          });
+
 
     } catch (IOException ex) {
       exception = ex;
       omClientResponse = new S3GetSecretResponse(null,
+          ozoneManager.getS3SecretManager(),
           createErrorOMResponse(omResponse, ex));
     } finally {
       addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
           ozoneManagerDoubleBufferHelper);
-      if (acquiredLock) {
-        omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK,
-            accessId);
-      }
     }
 
     Map<String, String> auditMap = new HashMap<>();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3RevokeSecretRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3RevokeSecretRequest.java
index 314ab1ec4b..739372d6f2 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3RevokeSecretRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/security/S3RevokeSecretRequest.java
@@ -18,13 +18,9 @@
 
 package org.apache.hadoop.ozone.om.request.s3.security;
 
-import com.google.common.base.Optional;
-import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
-import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.audit.OMAction;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.request.OMClientRequest;
@@ -43,8 +39,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
-
 /**
  * Handles RevokeS3Secret request.
  */
@@ -91,41 +85,36 @@ public class S3RevokeSecretRequest extends OMClientRequest {
     OMClientResponse omClientResponse = null;
     OMResponse.Builder omResponse =
             OmResponseUtil.getOMResponseBuilder(getOmRequest());
-    boolean acquiredLock = false;
     IOException exception = null;
-    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
 
     final RevokeS3SecretRequest revokeS3SecretRequest =
         getOmRequest().getRevokeS3SecretRequest();
     String kerberosID = revokeS3SecretRequest.getKerberosID();
     try {
-      acquiredLock =
-         omMetadataManager.getLock().acquireWriteLock(S3_SECRET_LOCK,
-             kerberosID);
-
-      // Remove if entry exists in table
-      if (omMetadataManager.getS3SecretTable().isExist(kerberosID)) {
-        // Invalid entry in table cache immediately
-        omMetadataManager.getS3SecretTable().addCacheEntry(
-            new CacheKey<>(kerberosID),
-            new CacheValue<>(Optional.absent(), transactionLogIndex));
-        omClientResponse = new S3RevokeSecretResponse(kerberosID,
-                omResponse.setStatus(Status.OK).build());
-      } else {
-        omClientResponse = new S3RevokeSecretResponse(null,
-                omResponse.setStatus(Status.S3_SECRET_NOT_FOUND).build());
-      }
+      omClientResponse = ozoneManager.getS3SecretManager()
+          .doUnderLock(kerberosID, s3SecretManager -> {
+            // Remove if entry exists in table
+            if (s3SecretManager.hasS3Secret(kerberosID)) {
+              // Invalid entry in table cache immediately
+              s3SecretManager.invalidateCacheEntry(kerberosID,
+                  transactionLogIndex);
+              return new S3RevokeSecretResponse(kerberosID,
+                  s3SecretManager,
+                  omResponse.setStatus(Status.OK).build());
+            } else {
+              return new S3RevokeSecretResponse(null,
+                  s3SecretManager,
+                  omResponse.setStatus(Status.S3_SECRET_NOT_FOUND).build());
+            }
+          });
     } catch (IOException ex) {
       exception = ex;
       omClientResponse = new S3RevokeSecretResponse(null,
+          ozoneManager.getS3SecretManager(),
           createErrorOMResponse(omResponse, ex));
     } finally {
       addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
           ozoneManagerDoubleBufferHelper);
-      if (acquiredLock) {
-        omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK,
-            kerberosID);
-      }
     }
 
     Map<String, String> auditMap = new HashMap<>();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
index 2b3708bfd5..030868f2a5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
@@ -57,7 +57,6 @@ import java.util.Map;
 import java.util.TreeSet;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_MAXIMUM_ACCESS_ID_LENGTH;
-import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MULTITENANCY_SCHEMA;
 
@@ -213,7 +212,6 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
     final String awsSecret = updateGetS3SecretRequest.getAwsSecret();
 
     boolean acquiredVolumeLock = false;
-    boolean acquiredS3SecretLock = false;
     final Map<String, String> auditMap = new HashMap<>();
     final OMMetadataManager omMetadataManager =
         ozoneManager.getMetadataManager();
@@ -301,24 +299,21 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
           new CacheValue<>(Optional.of(principalInfo),
               transactionLogIndex));
 
-      // Add S3SecretTable cache entry
-      acquiredS3SecretLock = omMetadataManager.getLock()
-          .acquireWriteLock(S3_SECRET_LOCK, accessId);
-
       // Expect accessId absence from S3SecretTable
-      if (omMetadataManager.getS3SecretTable().isExist(accessId)) {
-        LOG.error("accessId '{}' already exists in S3SecretTable", accessId);
-        throw new OMException("accessId '" + accessId +
-            "' already exists in S3SecretTable",
-            ResultCodes.TENANT_USER_ACCESS_ID_ALREADY_EXISTS);
-      }
-
-      omMetadataManager.getS3SecretTable().addCacheEntry(
-          new CacheKey<>(accessId),
-          new CacheValue<>(Optional.of(s3SecretValue), transactionLogIndex));
-
-      omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
-      acquiredS3SecretLock = false;
+      ozoneManager.getS3SecretManager()
+          .doUnderLock(accessId, s3SecretManager -> {
+            if (s3SecretManager.hasS3Secret(accessId)) {
+              LOG.error("accessId '{}' already exists in S3SecretTable",
+                  accessId);
+              throw new OMException("accessId '" + accessId +
+                  "' already exists in S3SecretTable",
+                  ResultCodes.TENANT_USER_ACCESS_ID_ALREADY_EXISTS);
+            }
+
+            s3SecretManager
+                .updateCache(accessId, s3SecretValue, transactionLogIndex);
+            return null;
+          });
 
       // Update tenant cache
       multiTenantManager.getCacheOp()
@@ -332,7 +327,8 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
               .build());
       omClientResponse = new OMTenantAssignUserAccessIdResponse(
           omResponse.build(), s3SecretValue, userPrincipal,
-          accessId, omDBAccessIdInfo, principalInfo);
+          accessId, omDBAccessIdInfo, principalInfo,
+          ozoneManager.getS3SecretManager());
     } catch (IOException ex) {
       exception = ex;
       omResponse.setTenantAssignUserAccessIdResponse(
@@ -342,9 +338,6 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
     } finally {
       addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
           ozoneManagerDoubleBufferHelper);
-      if (acquiredS3SecretLock) {
-        omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
-      }
       if (acquiredVolumeLock) {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
index 354d6acb32..dc4afeb044 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.S3SecretManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
@@ -51,7 +52,6 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.S3_SECRET_LOCK;
 import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
 import static org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature.MULTITENANCY_SCHEMA;
 
@@ -173,7 +173,6 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
     final String accessId = request.getAccessId();
     final String tenantId = request.getTenantId();
 
-    boolean acquiredS3SecretLock = false;
     boolean acquiredVolumeLock = false;
     IOException exception = null;
 
@@ -210,15 +209,8 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
           new CacheKey<>(accessId),
           new CacheValue<>(Optional.absent(), transactionLogIndex));
 
-      // Remove from S3SecretTable.
-      // Note: S3SecretTable will be deprecated in the future.
-      acquiredS3SecretLock = omMetadataManager.getLock()
-          .acquireWriteLock(S3_SECRET_LOCK, accessId);
-
-      omMetadataManager.getS3SecretTable().addCacheEntry(
-          new CacheKey<>(accessId),
-          new CacheValue<>(Optional.absent(), transactionLogIndex));
-
+      S3SecretManager s3SecretManager = ozoneManager.getS3SecretManager();
+      s3SecretManager.invalidateCacheEntry(accessId, transactionLogIndex);
       // Update tenant cache
       multiTenantManager.getCacheOp().revokeUserAccessId(accessId, tenantId);
 
@@ -227,7 +219,8 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
           TenantRevokeUserAccessIdResponse.newBuilder()
               .build());
       omClientResponse = new OMTenantRevokeUserAccessIdResponse(
-          omResponse.build(), accessId, userPrincipal, principalInfo);
+          omResponse.build(), accessId, userPrincipal, principalInfo,
+          s3SecretManager);
 
     } catch (IOException ex) {
       exception = ex;
@@ -237,9 +230,6 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
     } finally {
       addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
           ozoneManagerDoubleBufferHelper);
-      if (acquiredS3SecretLock) {
-        omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
-      }
       if (acquiredVolumeLock) {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/OMSetSecretResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/OMSetSecretResponse.java
index b94bc92ebd..caef3270fd 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/OMSetSecretResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/OMSetSecretResponse.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.om.response.s3.security;
 
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.S3SecretManager;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -45,13 +46,16 @@ public class OMSetSecretResponse extends OMClientResponse {
 
   private String accessId;
   private S3SecretValue s3SecretValue;
+  private S3SecretManager secretManager;
 
   public OMSetSecretResponse(@Nullable String accessId,
                              @Nullable S3SecretValue s3SecretValue,
+                             @Nonnull S3SecretManager secretManager,
                              @Nonnull OMResponse omResponse) {
     super(omResponse);
     this.accessId = accessId;
     this.s3SecretValue = s3SecretValue;
+    this.secretManager = secretManager;
   }
 
   /**
@@ -72,8 +76,12 @@ public class OMSetSecretResponse extends OMClientResponse {
 
     if (s3SecretValue != null) {
       LOG.debug("Updating TenantAccessIdTable");
-      omMetadataManager.getS3SecretTable().putWithBatch(batchOperation,
-          accessId, s3SecretValue);
+      if (secretManager.isBatchSupported()) {
+        secretManager.batcher().addWithBatch(batchOperation,
+            accessId, s3SecretValue);
+      } else {
+        secretManager.storeSecret(accessId, s3SecretValue);
+      }
     }
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3GetSecretResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3GetSecretResponse.java
index 29f64cae45..f6194734c4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3GetSecretResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3GetSecretResponse.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.om.response.s3.security;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.S3SecretManager;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -39,11 +40,14 @@ import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.S3_SECRET_TABLE;
 @CleanupTableInfo(cleanupTables = {S3_SECRET_TABLE})
 public class S3GetSecretResponse extends OMClientResponse {
 
-  private S3SecretValue s3SecretValue;
+  private final S3SecretValue s3SecretValue;
+  private final S3SecretManager s3SecretManager;
 
   public S3GetSecretResponse(@Nullable S3SecretValue s3SecretValue,
+      @Nonnull S3SecretManager secretManager,
       @Nonnull OMResponse omResponse) {
     super(omResponse);
+    this.s3SecretManager = secretManager;
     this.s3SecretValue = s3SecretValue;
   }
 
@@ -51,10 +55,16 @@ public class S3GetSecretResponse extends OMClientResponse {
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
-    if (s3SecretValue != null &&
-        getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
-      omMetadataManager.getS3SecretTable().putWithBatch(batchOperation,
-          s3SecretValue.getKerberosID(), s3SecretValue);
+    boolean isOk
+        = getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK;
+    if (s3SecretValue != null && isOk) {
+      if (s3SecretManager.isBatchSupported()) {
+        s3SecretManager.batcher().addWithBatch(batchOperation,
+            s3SecretValue.getKerberosID(), s3SecretValue);
+      } else {
+        s3SecretManager.storeSecret(s3SecretValue.getKerberosID(),
+            s3SecretValue);
+      }
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3RevokeSecretResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3RevokeSecretResponse.java
index 05633010eb..5391bc2be0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3RevokeSecretResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/security/S3RevokeSecretResponse.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.om.response.s3.security;
 
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.S3SecretManager;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
@@ -38,20 +39,27 @@ import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.S3_SECRET_TABLE;
 public class S3RevokeSecretResponse extends OMClientResponse {
 
   private final String kerberosID;
+  private final S3SecretManager s3SecretManager;
 
   public S3RevokeSecretResponse(@Nullable String kerberosID,
+                                @Nonnull S3SecretManager s3SecretManager,
                                 @Nonnull OMResponse omResponse) {
     super(omResponse);
     this.kerberosID = kerberosID;
+    this.s3SecretManager = s3SecretManager;
   }
 
   @Override
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
-    if (kerberosID != null && getOMResponse().getStatus() == Status.OK) {
-      omMetadataManager.getS3SecretTable().deleteWithBatch(batchOperation,
-          kerberosID);
+    if (kerberosID != null
+        && getOMResponse().getStatus() == Status.OK) {
+      if (s3SecretManager.isBatchSupported()) {
+        s3SecretManager.batcher().deleteWithBatch(batchOperation, kerberosID);
+      } else {
+        s3SecretManager.revokeSecret(kerberosID);
+      }
     }
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantAssignUserAccessIdResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantAssignUserAccessIdResponse.java
index e2f273f995..ba5ad61696 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantAssignUserAccessIdResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantAssignUserAccessIdResponse.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om.response.s3.tenant;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.S3SecretManager;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -50,6 +51,7 @@ public class OMTenantAssignUserAccessIdResponse extends OMClientResponse {
   private String principal, accessId;
   private OmDBAccessIdInfo omDBAccessIdInfo;
   private OmDBUserPrincipalInfo omDBUserPrincipalInfo;
+  private S3SecretManager s3SecretManager;
 
   @SuppressWarnings("checkstyle:parameternumber")
   public OMTenantAssignUserAccessIdResponse(@Nonnull OMResponse omResponse,
@@ -57,7 +59,8 @@ public class OMTenantAssignUserAccessIdResponse extends OMClientResponse {
       @Nonnull String principal,
       @Nonnull String accessId,
       @Nonnull OmDBAccessIdInfo omDBAccessIdInfo,
-      @Nonnull OmDBUserPrincipalInfo omDBUserPrincipalInfo
+      @Nonnull OmDBUserPrincipalInfo omDBUserPrincipalInfo,
+      @Nonnull S3SecretManager s3SecretManager
   ) {
     super(omResponse);
     this.s3SecretValue = s3SecretValue;
@@ -65,6 +68,7 @@ public class OMTenantAssignUserAccessIdResponse extends OMClientResponse {
     this.accessId = accessId;
     this.omDBAccessIdInfo = omDBAccessIdInfo;
     this.omDBUserPrincipalInfo = omDBUserPrincipalInfo;
+    this.s3SecretManager = s3SecretManager;
   }
 
   /**
@@ -80,11 +84,16 @@ public class OMTenantAssignUserAccessIdResponse extends OMClientResponse {
   public void addToDBBatch(OMMetadataManager omMetadataManager,
       BatchOperation batchOperation) throws IOException {
 
-    if (s3SecretValue != null &&
-        getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
-      // Add S3SecretTable entry
-      omMetadataManager.getS3SecretTable().putWithBatch(batchOperation,
-          accessId, s3SecretValue);
+    boolean isOk =
+        getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK;
+    if (s3SecretValue != null && isOk) {
+      if (s3SecretManager.isBatchSupported()) {
+        // Add S3SecretTable entry
+        s3SecretManager.batcher().addWithBatch(batchOperation,
+            accessId, s3SecretValue);
+      } else {
+        s3SecretManager.storeSecret(accessId, s3SecretValue);
+      }
     }
 
     omMetadataManager.getTenantAccessIdTable().putWithBatch(
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantRevokeUserAccessIdResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantRevokeUserAccessIdResponse.java
index ff137226fe..93f3b8aa67 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantRevokeUserAccessIdResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMTenantRevokeUserAccessIdResponse.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.om.response.s3.tenant;
 
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.S3SecretManager;
 import org.apache.hadoop.ozone.om.helpers.OmDBUserPrincipalInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -45,17 +46,20 @@ public class OMTenantRevokeUserAccessIdResponse extends OMClientResponse {
 
   private String principal, accessId;
   private OmDBUserPrincipalInfo omDBUserPrincipalInfo;
+  private S3SecretManager s3SecretManager;
 
   @SuppressWarnings("checkstyle:parameternumber")
   public OMTenantRevokeUserAccessIdResponse(@Nonnull OMResponse omResponse,
       @Nonnull String accessId,
       @Nonnull String principal,
-      @Nonnull OmDBUserPrincipalInfo omDBUserPrincipalInfo
+      @Nonnull OmDBUserPrincipalInfo omDBUserPrincipalInfo,
+      @Nonnull S3SecretManager s3SecretManager
   ) {
     super(omResponse);
     this.principal = principal;
     this.accessId = accessId;
     this.omDBUserPrincipalInfo = omDBUserPrincipalInfo;
+    this.s3SecretManager = s3SecretManager;
   }
 
   /**
@@ -74,8 +78,11 @@ public class OMTenantRevokeUserAccessIdResponse extends OMClientResponse {
     assert (accessId != null);
     // TODO: redundant check? Is status always OK when addToDBBatch is called
     if (getOMResponse().getStatus() == OzoneManagerProtocolProtos.Status.OK) {
-      omMetadataManager.getS3SecretTable().deleteWithBatch(batchOperation,
-          accessId);
+      if (s3SecretManager.isBatchSupported()) {
+        s3SecretManager.batcher().deleteWithBatch(batchOperation, accessId);
+      } else {
+        s3SecretManager.revokeSecret(accessId);
+      }
     }
 
     omMetadataManager.getTenantAccessIdTable().deleteWithBatch(
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java
index 89427ed595..a823d4a1c3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.S3SecretManager;
-import org.apache.hadoop.ozone.om.S3SecretManagerImpl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
@@ -69,7 +68,7 @@ public class OzoneDelegationTokenSecretManager
       .getLogger(OzoneDelegationTokenSecretManager.class);
   private final Map<OzoneTokenIdentifier, TokenInfo> currentTokens;
   private final OzoneSecretStore store;
-  private final S3SecretManagerImpl s3SecretManager;
+  private final S3SecretManager s3SecretManager;
   private Thread tokenRemoverThread;
   private final long tokenRemoverScanInterval;
   private String omCertificateSerialId;
@@ -93,12 +92,12 @@ public class OzoneDelegationTokenSecretManager
         b.tokenRenewInterval, b.service, LOG);
     setCertClient(b.certClient);
     this.omServiceId = b.omServiceId;
-    currentTokens = new ConcurrentHashMap();
+    currentTokens = new ConcurrentHashMap<>();
     this.tokenRemoverScanInterval = b.tokenRemoverScanInterval;
-    this.s3SecretManager = (S3SecretManagerImpl) b.s3SecretManager;
-    this.store = new OzoneSecretStore(b.ozoneConf,
-        this.s3SecretManager.getOmMetadataManager());
+    this.s3SecretManager = b.s3SecretManager;
     this.ozoneManager = b.ozoneManager;
+    this.store = new OzoneSecretStore(b.ozoneConf,
+        this.ozoneManager.getMetadataManager());
     isRatisEnabled = b.ozoneConf.getBoolean(
         OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
         OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
@@ -503,7 +502,7 @@ public class OzoneDelegationTokenSecretManager
     }
     String awsSecret;
     try {
-      awsSecret = s3SecretManager.getS3UserSecretString(identifier
+      awsSecret = s3SecretManager.getSecretString(identifier
           .getAwsAccessId());
     } catch (IOException e) {
       LOG.error("Error while validating S3 identifier:{}",
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
index 5f0b8629b6..ca0344f463 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
@@ -23,14 +23,15 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ozone.audit.AuditLogger;
 import org.apache.hadoop.ozone.audit.AuditMessage;
-import org.apache.hadoop.ozone.om.multitenant.AuthorizerLockImpl;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.S3SecretLockedManager;
+import org.apache.hadoop.ozone.om.S3SecretManagerImpl;
 import org.apache.hadoop.ozone.om.TenantOp;
+import org.apache.hadoop.ozone.om.multitenant.AuthorizerLockImpl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
@@ -82,7 +83,6 @@ public class TestS3GetSecretRequest {
 
   private OzoneManager ozoneManager;
   private OMMetrics omMetrics;
-  private OMMetadataManager omMetadataManager;
   private AuditLogger auditLogger;
   // Set ozoneManagerDoubleBuffer to do nothing.
   private final OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper =
@@ -129,10 +129,15 @@ public class TestS3GetSecretRequest {
         folder.newFolder().getAbsolutePath());
     // No need to conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS, ...) here
     //  as we did the trick earlier with mockito.
-    omMetadataManager = new OmMetadataManagerImpl(conf);
+    OmMetadataManagerImpl omMetadataManager = new OmMetadataManagerImpl(conf);
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
     when(ozoneManager.isRatisEnabled()).thenReturn(true);
+    S3SecretLockedManager secretManager = new S3SecretLockedManager(
+        new S3SecretManagerImpl(omMetadataManager, omMetadataManager),
+        omMetadataManager.getLock());
+    when(ozoneManager.getS3SecretManager()).thenReturn(secretManager);
+
     auditLogger = mock(AuditLogger.class);
     when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
     doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/S3SecretStoreMap.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/S3SecretStoreMap.java
new file mode 100644
index 0000000000..67efab4b0e
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/S3SecretStoreMap.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.security;
+
+import org.apache.hadoop.ozone.om.S3Batcher;
+import org.apache.hadoop.ozone.om.S3SecretStore;
+import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Map based implementation of {@link S3SecretStore}.
+ */
+public class S3SecretStoreMap implements S3SecretStore {
+  private final Map<String, S3SecretValue> map = new ConcurrentHashMap<>();
+
+  public S3SecretStoreMap(Map<String, S3SecretValue> map) {
+    this.map.putAll(map);
+  }
+
+  @Override
+  public void storeSecret(String kerberosId, S3SecretValue secret)
+      throws IOException {
+    map.put(kerberosId, secret);
+  }
+
+  @Override
+  public S3SecretValue getSecret(String kerberosID) throws IOException {
+    return map.get(kerberosID);
+  }
+
+  @Override
+  public void revokeSecret(String kerberosId) throws IOException {
+    map.remove(kerberosId);
+  }
+
+  @Override
+  public S3Batcher batcher() {
+    return null;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
index 0003c73efc..2f21a48cda 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.S3SecretCache;
+import org.apache.hadoop.ozone.om.S3SecretLockedManager;
 import org.apache.hadoop.ozone.om.S3SecretManager;
 import org.apache.hadoop.ozone.om.S3SecretManagerImpl;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
@@ -64,6 +66,9 @@ import org.mockito.Mockito;
  */
 public class TestOzoneDelegationTokenSecretManager {
 
+  private static final long TOKEN_MAX_LIFETIME = 1000 * 20;
+  private static final long TOKEN_REMOVER_SCAN_INTERVAL = 1000 * 20;
+
   private OzoneManager om;
   private OzoneDelegationTokenSecretManager secretManager;
   private SecurityConfig securityConfig;
@@ -72,10 +77,7 @@ public class TestOzoneDelegationTokenSecretManager {
   private Text serviceRpcAdd;
   private OzoneConfiguration conf;
   private static final Text TEST_USER = new Text("testUser");
-  private long tokenMaxLifetime = 1000 * 20;
-  private long tokenRemoverScanInterval = 1000 * 20;
   private S3SecretManager s3SecretManager;
-  private String s3Secret = "dbaksbzljandlkandlsd";
 
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
@@ -88,28 +90,19 @@ public class TestOzoneDelegationTokenSecretManager {
     certificateClient.init();
     expiryTime = Time.monotonicNow() + 60 * 60 * 24;
     serviceRpcAdd = new Text("localhost");
-    final Map<String, String> s3Secrets = new HashMap<>();
-    s3Secrets.put("testuser1", s3Secret);
-    s3Secrets.put("abc", "djakjahkd");
+    final Map<String, S3SecretValue> s3Secrets = new HashMap<>();
+    s3Secrets.put("testuser1",
+        new S3SecretValue("testuser1", "dbaksbzljandlkandlsd"));
+    s3Secrets.put("abc",
+        new S3SecretValue("abc", "djakjahkd"));
     om = Mockito.mock(OzoneManager.class);
     OMMetadataManager metadataManager = new OmMetadataManagerImpl(conf);
-    s3SecretManager = new S3SecretManagerImpl(conf, metadataManager) {
-      @Override
-      public S3SecretValue getS3Secret(String kerberosID) {
-        if (s3Secrets.containsKey(kerberosID)) {
-          return new S3SecretValue(kerberosID, s3Secrets.get(kerberosID));
-        }
-        return null;
-      }
-
-      @Override
-      public String getS3UserSecretString(String awsAccessKey) {
-        if (s3Secrets.containsKey(awsAccessKey)) {
-          return s3Secrets.get(awsAccessKey);
-        }
-        return null;
-      }
-    };
+    Mockito.when(om.getMetadataManager()).thenReturn(metadataManager);
+    s3SecretManager = new S3SecretLockedManager(
+        new S3SecretManagerImpl(new S3SecretStoreMap(s3Secrets),
+            Mockito.mock(S3SecretCache.class)),
+        metadataManager.getLock()
+    );
   }
 
   private OzoneConfiguration createNewTestPath() throws IOException {
@@ -168,8 +161,8 @@ public class TestOzoneDelegationTokenSecretManager {
 
   @Test
   public void testLeadershipCheckinRetrievePassword() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     Mockito.doThrow(new OMNotLeaderException("Not leader"))
         .when(om).checkLeaderStatus();
     OzoneTokenIdentifier identifier = new OzoneTokenIdentifier();
@@ -201,8 +194,8 @@ public class TestOzoneDelegationTokenSecretManager {
 
   @Test
   public void testCreateToken() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
         TEST_USER, TEST_USER);
@@ -219,14 +212,14 @@ public class TestOzoneDelegationTokenSecretManager {
   private void restartSecretManager() throws IOException {
     secretManager.stop();
     secretManager = null;
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
   }
 
   private void testRenewTokenSuccessHelper(boolean restartSecretManager)
       throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
         TEST_USER,
@@ -256,8 +249,8 @@ public class TestOzoneDelegationTokenSecretManager {
    */
   @Test
   public void testRenewTokenFailure() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
         TEST_USER, TEST_USER);
@@ -273,7 +266,7 @@ public class TestOzoneDelegationTokenSecretManager {
   @Test
   public void testRenewTokenFailureMaxTime() throws Exception {
     secretManager = createSecretManager(conf, 100,
-        100, tokenRemoverScanInterval);
+        100, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
         TEST_USER,
@@ -291,7 +284,7 @@ public class TestOzoneDelegationTokenSecretManager {
   @Test
   public void testRenewTokenFailureRenewalTime() throws Exception {
     secretManager = createSecretManager(conf, 1000 * 10,
-        10, tokenRemoverScanInterval);
+        10, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
         TEST_USER,
@@ -304,8 +297,8 @@ public class TestOzoneDelegationTokenSecretManager {
 
   @Test
   public void testCreateIdentifier() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     OzoneTokenIdentifier identifier = secretManager.createIdentifier();
     // Check basic details.
@@ -316,8 +309,8 @@ public class TestOzoneDelegationTokenSecretManager {
 
   @Test
   public void testCancelTokenSuccess() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
         TEST_USER, TEST_USER);
@@ -326,8 +319,8 @@ public class TestOzoneDelegationTokenSecretManager {
 
   @Test
   public void testCancelTokenFailure() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
         TEST_USER,
@@ -340,8 +333,8 @@ public class TestOzoneDelegationTokenSecretManager {
 
   @Test
   public void testVerifySignatureSuccess() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     OzoneTokenIdentifier id = new OzoneTokenIdentifier();
     id.setOmCertSerialId(certificateClient.getCertificate()
@@ -354,8 +347,8 @@ public class TestOzoneDelegationTokenSecretManager {
 
   @Test
   public void testVerifySignatureFailure() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
     OzoneTokenIdentifier id = new OzoneTokenIdentifier();
     // set invalid om cert serial id
@@ -367,8 +360,8 @@ public class TestOzoneDelegationTokenSecretManager {
 
   @Test
   public void testValidateS3AUTHINFOSuccess() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
 
     OzoneTokenIdentifier identifier = new OzoneTokenIdentifier();
@@ -386,8 +379,8 @@ public class TestOzoneDelegationTokenSecretManager {
 
   @Test
   public void testValidateS3AUTHINFOFailure() throws Exception {
-    secretManager = createSecretManager(conf, tokenMaxLifetime,
-        expiryTime, tokenRemoverScanInterval);
+    secretManager = createSecretManager(conf, TOKEN_MAX_LIFETIME,
+        expiryTime, TOKEN_REMOVER_SCAN_INTERVAL);
     secretManager.start(certificateClient);
 
     OzoneTokenIdentifier identifier = new OzoneTokenIdentifier();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org