You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ri...@apache.org on 2023/04/06 01:24:51 UTC

[ozone] branch HDDS-7733-Symmetric-Tokens updated: HDDS-7831. Use symmetric secret key to sign and verify token (#4417)

This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch HDDS-7733-Symmetric-Tokens
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7733-Symmetric-Tokens by this push:
     new 84e260196f HDDS-7831. Use symmetric secret key to sign and verify token (#4417)
84e260196f is described below

commit 84e260196fea8ad18d3be99f38ddf4a1ae34c514
Author: Duong Nguyen <du...@gmail.com>
AuthorDate: Wed Apr 5 18:24:44 2023 -0700

    HDDS-7831. Use symmetric secret key to sign and verify token (#4417)
---
 .../security/token/ContainerTokenIdentifier.java   |  17 +-
 .../security/token/OzoneBlockTokenIdentifier.java  |  26 +-
 .../security/token/ShortLivedTokenIdentifier.java  |  24 +-
 .../hadoop/hdds/security/x509/SecurityConfig.java  |   4 +
 .../org/apache/hadoop/ozone/OzoneSecurityUtil.java |   1 +
 .../apache/hadoop/ozone/HddsDatanodeService.java   |  19 +-
 .../common/statemachine/DatanodeStateMachine.java  |  12 +-
 .../ECReconstructionCoordinator.java               |  10 +-
 .../container/ec/reconstruction/TokenHelper.java   |  44 +---
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  17 +-
 .../ozone/container/common/ContainerTestUtils.java |   2 +-
 .../container/common/TestDatanodeStateMachine.java |  11 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   6 +-
 .../upgrade/TestDataNodeStartupSlvLessThanMlv.java |   3 +-
 .../upgrade/TestDatanodeUpgradeToSchemaV3.java     |  10 +-
 .../upgrade/TestDatanodeUpgradeToScmHA.java        |   8 +-
 .../security/symmetric/DefaultSecretKeyClient.java |  74 ++++++
 .../symmetric/DefaultSecretKeySignerClient.java    | 130 ++++++++++
 .../symmetric/DefaultSecretKeyVerifierClient.java  |  99 +++++++
 .../hdds/security/symmetric/ManagedSecretKey.java  |  42 ++-
 .../hdds/security/symmetric/SecretKeyClient.java   |  26 ++
 .../hdds/security/symmetric/SecretKeyConfig.java   |  26 +-
 .../hdds/security/symmetric/SecretKeyManager.java  |   8 +-
 .../security/symmetric/SecretKeySignerClient.java  |  43 +++
 .../security/symmetric/SecretKeyStateImpl.java     |   3 +-
 .../symmetric/SecretKeyVerifierClient.java         |  30 +++
 .../hdds/security/token/BlockTokenVerifier.java    |   7 +-
 .../token/ContainerTokenSecretManager.java         |  14 +-
 .../security/token/ContainerTokenVerifier.java     |   6 +-
 .../token/OzoneBlockTokenSecretManager.java        |  66 +----
 .../token/ShortLivedTokenSecretManager.java        |  75 ++----
 .../security/token/ShortLivedTokenVerifier.java    |  66 +++--
 .../hadoop/hdds/security/token/TokenVerifier.java  |   9 +-
 .../security/symmetric/ManagedSecretKeyTest.java   |  75 ++++++
 .../security/symmetric/SecretKeyManagerTest.java   |  13 +-
 .../hdds/security/symmetric/SecretKeyTestUtil.java |  52 ++++
 .../security/token/TestBlockTokenVerifier.java     |  18 +-
 .../security/token/TestContainerTokenVerifier.java |  12 +-
 .../token/TestOzoneBlockTokenIdentifier.java       | 287 +++------------------
 .../token/TestOzoneBlockTokenSecretManager.java    | 240 ++++-------------
 .../hdds/security/token/TokenVerifierTests.java    | 168 +++++++-----
 .../org.mockito.plugins.MockMaker                  |  16 ++
 .../src/main/proto/ScmAdminProtocol.proto          |   3 +-
 .../interface-client/src/main/proto/hdds.proto     |   3 +-
 .../hdds/scm/server/SCMSecurityProtocolServer.java |   4 +-
 .../hdds/scm/server/StorageContainerManager.java   |  55 +---
 .../ozone/container/common/TestEndPoint.java       |  12 +-
 .../hdds/scm/storage/TestContainerCommandsEC.java  |  28 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   7 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  17 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |   8 +-
 .../org/apache/hadoop/ozone/TestSecretKeysApi.java |   1 -
 .../hadoop/ozone/TestSecureOzoneCluster.java       | 255 ------------------
 .../hadoop/ozone/client/SecretKeyTestClient.java   |  73 ++++++
 .../client/rpc/TestContainerStateMachine.java      |   2 +
 .../rpc/TestContainerStateMachineFlushDelay.java   |   2 +
 .../client/rpc/TestOzoneAtRestEncryption.java      |   2 +
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java |   8 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   5 +-
 .../ozoneimpl/TestOzoneContainerWithTLS.java       |  20 +-
 .../ozoneimpl/TestSecureOzoneContainer.java        |  18 +-
 .../server/TestSecureContainerServer.java          |  17 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  64 +++--
 .../ozone/security/TestOzoneManagerBlockToken.java | 251 ------------------
 64 files changed, 1199 insertions(+), 1475 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenIdentifier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenIdentifier.java
index ebb0709d4b..7cd8c279ba 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenIdentifier.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenIdentifier.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerTokenSecretProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ProtobufUtils;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -28,6 +29,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Objects;
+import java.util.UUID;
 
 /**
  * Token identifier for container operations, similar to block token.
@@ -43,11 +45,18 @@ public class ContainerTokenIdentifier extends ShortLivedTokenIdentifier {
   }
 
   public ContainerTokenIdentifier(String ownerId, ContainerID containerID,
-      String certSerialId, Instant expiryDate) {
-    super(ownerId, expiryDate, certSerialId);
+                                  Instant expiryDate) {
+    super(ownerId, expiryDate);
     this.containerID = containerID;
   }
 
+  public ContainerTokenIdentifier(String ownerId, ContainerID containerID,
+                                  UUID secretKeyId,
+                                  Instant expiryDate) {
+    this(ownerId, containerID, expiryDate);
+    setSecretKeyId(secretKeyId);
+  }
+
   @Override
   public Text getKind() {
     return KIND;
@@ -58,7 +67,7 @@ public class ContainerTokenIdentifier extends ShortLivedTokenIdentifier {
     ContainerTokenSecretProto.Builder builder = ContainerTokenSecretProto
         .newBuilder()
         .setOwnerId(getOwnerId())
-        .setCertSerialId(getCertSerialId())
+        .setSecretKeyId(ProtobufUtils.toProtobuf(getSecretKeyId()))
         .setExpiryDate(getExpiry().toEpochMilli())
         .setContainerId(containerID.getProtobuf());
     out.write(builder.build().toByteArray());
@@ -72,7 +81,7 @@ public class ContainerTokenIdentifier extends ShortLivedTokenIdentifier {
     }
     ContainerTokenSecretProto proto =
         ContainerTokenSecretProto.parseFrom((DataInputStream) in);
-    setCertSerialId(proto.getCertSerialId());
+    setSecretKeyId(ProtobufUtils.fromProtobuf(proto.getSecretKeyId()));
     setExpiry(Instant.ofEpochMilli(proto.getExpiryDate()));
     setOwnerId(proto.getOwnerId());
     this.containerID = ContainerID.getFromProtobuf(proto.getContainerId());
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java
index dcd75d6334..be18f90a07 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenIdentifier.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token.TrivialRenewer;
+import org.apache.hadoop.util.ProtobufUtils;
 
 import java.io.DataInput;
 import java.io.DataInputStream;
@@ -59,16 +60,14 @@ public class OzoneBlockTokenIdentifier extends ShortLivedTokenIdentifier {
   }
 
   public OzoneBlockTokenIdentifier(String ownerId, BlockID blockId,
-      Set<AccessModeProto> modes, long expiryDate, String omCertSerialId,
-      long maxLength) {
-    this(ownerId, getTokenService(blockId), modes, expiryDate, omCertSerialId,
+      Set<AccessModeProto> modes, long expiryDate, long maxLength) {
+    this(ownerId, getTokenService(blockId), modes, expiryDate,
         maxLength);
   }
 
   public OzoneBlockTokenIdentifier(String ownerId, String blockId,
-      Set<AccessModeProto> modes, long expiryDate, String omCertSerialId,
-      long maxLength) {
-    super(ownerId, Instant.ofEpochMilli(expiryDate), omCertSerialId);
+      Set<AccessModeProto> modes, long expiryDate, long maxLength) {
+    super(ownerId, Instant.ofEpochMilli(expiryDate));
     this.blockId = blockId;
     this.modes = modes == null
         ? EnumSet.noneOf(AccessModeProto.class) : EnumSet.copyOf(modes);
@@ -136,7 +135,7 @@ public class OzoneBlockTokenIdentifier extends ShortLivedTokenIdentifier {
         BlockTokenSecretProto.parseFrom((DataInputStream) in);
     setOwnerId(token.getOwnerId());
     setExpiry(Instant.ofEpochMilli(token.getExpiryDate()));
-    setCertSerialId(token.getOmCertSerialId());
+    setSecretKeyId(ProtobufUtils.fromProtobuf(token.getSecretKeyId()));
     this.blockId = token.getBlockId();
     this.modes = EnumSet.copyOf(token.getModesList());
     this.maxLength = token.getMaxLength();
@@ -147,10 +146,13 @@ public class OzoneBlockTokenIdentifier extends ShortLivedTokenIdentifier {
       throws IOException {
     BlockTokenSecretProto token =
         BlockTokenSecretProto.parseFrom((DataInputStream) in);
-    return new OzoneBlockTokenIdentifier(token.getOwnerId(),
-        token.getBlockId(), EnumSet.copyOf(token.getModesList()),
-        token.getExpiryDate(), token.getOmCertSerialId(),
-        token.getMaxLength());
+    OzoneBlockTokenIdentifier tokenId =
+        new OzoneBlockTokenIdentifier(token.getOwnerId(),
+            token.getBlockId(), EnumSet.copyOf(token.getModesList()),
+            token.getExpiryDate(),
+            token.getMaxLength());
+    tokenId.setSecretKeyId(ProtobufUtils.fromProtobuf(token.getSecretKeyId()));
+    return tokenId;
   }
 
   @Override
@@ -158,7 +160,7 @@ public class OzoneBlockTokenIdentifier extends ShortLivedTokenIdentifier {
     BlockTokenSecretProto.Builder builder = BlockTokenSecretProto.newBuilder()
         .setBlockId(blockId)
         .setOwnerId(getOwnerId())
-        .setOmCertSerialId(getCertSerialId())
+        .setSecretKeyId(ProtobufUtils.toProtobuf(getSecretKeyId()))
         .setExpiryDate(getExpiryDate())
         .setMaxLength(maxLength);
     // Add access mode allowed
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenIdentifier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenIdentifier.java
index 7475fa5042..dbd168f96a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenIdentifier.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenIdentifier.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 
 import java.time.Instant;
 import java.util.Objects;
+import java.util.UUID;
 
 /**
  * Base class for short-lived tokens (block, container).
@@ -33,18 +34,16 @@ public abstract class ShortLivedTokenIdentifier extends TokenIdentifier {
 
   private String ownerId;
   private Instant expiry;
-  private String certSerialId;
+  private UUID secretKeyId;
 
   public abstract String getService();
 
   protected ShortLivedTokenIdentifier() {
   }
 
-  protected ShortLivedTokenIdentifier(String ownerId, Instant expiry,
-      String certSerialId) {
+  protected ShortLivedTokenIdentifier(String ownerId, Instant expiry) {
     this.ownerId = ownerId;
     this.expiry = expiry;
-    this.certSerialId = certSerialId;
   }
 
   @Override
@@ -67,22 +66,23 @@ public abstract class ShortLivedTokenIdentifier extends TokenIdentifier {
     this.expiry = expiry;
   }
 
-  protected void setCertSerialId(String certSerialId) {
-    this.certSerialId = certSerialId;
+  public void setSecretKeyId(UUID secretKeyId) {
+    this.secretKeyId = secretKeyId;
   }
 
   public Instant getExpiry() {
     return expiry;
   }
 
-  public String getCertSerialId() {
-    return certSerialId;
-  }
 
   public String getOwnerId() {
     return ownerId;
   }
 
+  public UUID getSecretKeyId() {
+    return secretKeyId;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -95,18 +95,18 @@ public abstract class ShortLivedTokenIdentifier extends TokenIdentifier {
     ShortLivedTokenIdentifier that = (ShortLivedTokenIdentifier) o;
     return Objects.equals(ownerId, that.ownerId) &&
         Objects.equals(expiry, that.expiry) &&
-        Objects.equals(certSerialId, that.certSerialId);
+        Objects.equals(secretKeyId, that.secretKeyId);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(ownerId, expiry, certSerialId);
+    return Objects.hash(ownerId, expiry, secretKeyId);
   }
 
   @Override
   public String toString() {
     return "ownerId=" + ownerId +
         ", expiry=" + expiry +
-        ", certSerialId=" + certSerialId;
+        ", secretKeyId=" + secretKeyId;
   }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
index 1fe22a45c9..90a2af176f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
@@ -510,4 +510,8 @@ public class SecurityConfig {
   public long getSslTruststoreReloadInterval() {
     return truststoreReloadInterval;
   }
+
+  public boolean isTokenEnabled() {
+    return blockTokenEnabled || containerTokenEnabled;
+  }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java
index 21e6ffbb41..db258e03e5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneSecurityUtil.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 
 import org.apache.commons.validator.routines.InetAddressValidator;
+
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_HTTP_SECURITY_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_HTTP_SECURITY_ENABLED_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_DEFAULT;
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index a20bcd2003..30dc3199a1 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStore;
 import org.apache.hadoop.hdds.datanode.metadata.DatanodeCRLStoreImpl;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.security.symmetric.DefaultSecretKeyClient;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
@@ -96,6 +98,7 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
   private DatanodeStateMachine datanodeStateMachine;
   private List<ServicePlugin> plugins;
   private CertificateClient dnCertClient;
+  private SecretKeyClient secretKeyClient;
   private String component;
   private HddsDatanodeHttpServer httpServer;
   private boolean printBanner;
@@ -290,9 +293,14 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
 
       if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
         dnCertClient = initializeCertificateClient(dnCertClient);
+
+        if (secConf.isTokenEnabled()) {
+          secretKeyClient = DefaultSecretKeyClient.create(conf);
+          secretKeyClient.start(conf);
+        }
       }
       datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
-          dnCertClient, this::terminateDatanode, dnCRLStore);
+          dnCertClient, secretKeyClient, this::terminateDatanode, dnCRLStore);
       try {
         httpServer = new HddsDatanodeHttpServer(conf);
         httpServer.start();
@@ -548,6 +556,10 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
         LOG.error("Datanode CRL store stop failed", ex);
       }
       RatisDropwizardExports.clear(ratisMetricsMap, ratisReporterList);
+
+      if (secretKeyClient != null) {
+        secretKeyClient.stop();
+      }
     }
   }
 
@@ -586,6 +598,11 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
     dnCertClient = client;
   }
 
+  @VisibleForTesting
+  public void setSecretKeyClient(SecretKeyClient client) {
+    this.secretKeyClient = client;
+  }
+
   @Override
   public void printError(Throwable error) {
     LOG.error("Exception in HddsDatanodeService.", error);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 5536b02e54..2373496491 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
 import org.apache.hadoop.hdds.utils.IOUtils;
@@ -139,6 +140,7 @@ public class DatanodeStateMachine implements Closeable {
   public DatanodeStateMachine(DatanodeDetails datanodeDetails,
                               ConfigurationSource conf,
                               CertificateClient certClient,
+                              SecretKeyClient secretKeyClient,
                               HddsDatanodeStopService hddsDatanodeStopService,
                               DatanodeCRLStore crlStore) throws IOException {
     DatanodeConfiguration dnConf =
@@ -171,7 +173,7 @@ public class DatanodeStateMachine implements Closeable {
     constructionLock.writeLock().lock();
     try {
       container = new OzoneContainer(this.datanodeDetails,
-          conf, context, certClient);
+          conf, context, certClient, secretKeyClient);
     } finally {
       constructionLock.writeLock().unlock();
     }
@@ -204,7 +206,7 @@ public class DatanodeStateMachine implements Closeable {
     ecReconstructionMetrics = ECReconstructionMetrics.create();
 
     ecReconstructionCoordinator = new ECReconstructionCoordinator(
-        conf, certClient, context, ecReconstructionMetrics);
+        conf, certClient, secretKeyClient, context, ecReconstructionMetrics);
 
     // This is created as an instance variable as Mockito needs to access it in
     // a test. The test mocks it in a running mini-cluster.
@@ -245,6 +247,12 @@ public class DatanodeStateMachine implements Closeable {
     queueMetrics = DatanodeQueueMetrics.create(this);
   }
 
+  @VisibleForTesting
+  public DatanodeStateMachine(DatanodeDetails datanodeDetails,
+                              ConfigurationSource conf) throws IOException {
+    this(datanodeDetails, conf, null, null, null, null);
+  }
+
   private int getEndPointTaskThreadPoolSize() {
     // TODO(runzhiwang): current only support one recon, if support multiple
     //  recon in future reconServerCount should be the real number of recon
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index a7e5bfb92d..7d258819eb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
 import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.IOUtils;
@@ -110,9 +111,9 @@ public class ECReconstructionCoordinator implements Closeable {
   private final ECReconstructionMetrics metrics;
   private final StateContext context;
 
-  public ECReconstructionCoordinator(ConfigurationSource conf,
-      CertificateClient certificateClient,
-      StateContext context,
+  public ECReconstructionCoordinator(
+      ConfigurationSource conf, CertificateClient certificateClient,
+      SecretKeySignerClient secretKeyClient, StateContext context,
       ECReconstructionMetrics metrics) throws IOException {
     this.context = context;
     this.containerOperationClient = new ECContainerOperationClient(conf,
@@ -128,7 +129,7 @@ public class ECReconstructionCoordinator implements Closeable {
             new ThreadPoolExecutor.CallerRunsPolicy());
     this.blockInputStreamFactory = BlockInputStreamFactoryImpl
         .getInstance(byteBufferPool, () -> ecReconstructExecutor);
-    tokenHelper = new TokenHelper(conf, certificateClient);
+    tokenHelper = new TokenHelper(conf, secretKeyClient);
     this.clientMetrics = ContainerClientMetrics.acquire();
     this.metrics = metrics;
   }
@@ -390,7 +391,6 @@ public class ECReconstructionCoordinator implements Closeable {
     if (containerOperationClient != null) {
       containerOperationClient.close();
     }
-    tokenHelper.stop();
   }
 
   private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/TokenHelper.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/TokenHelper.java
index ffd7632804..682b9dc147 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/TokenHelper.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/TokenHelper.java
@@ -22,17 +22,16 @@ import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
 import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
-import java.time.Duration;
 import java.util.EnumSet;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -52,7 +51,7 @@ class TokenHelper {
   private static final Set<AccessModeProto> MODES =
       EnumSet.of(READ, WRITE, DELETE);
 
-  TokenHelper(ConfigurationSource conf, CertificateClient certClient)
+  TokenHelper(ConfigurationSource conf, SecretKeySignerClient secretKeyClient)
       throws IOException {
 
     SecurityConfig securityConfig = new SecurityConfig(conf);
@@ -61,7 +60,7 @@ class TokenHelper {
 
     // checking certClient != null instead of securityConfig.isSecurityEnabled()
     // to allow integration test without full kerberos etc. setup
-    boolean securityEnabled = certClient != null;
+    boolean securityEnabled = secretKeyClient != null;
 
     if (securityEnabled && (blockTokenEnabled || containerTokenEnabled)) {
       user = UserGroupInformation.getCurrentUser().getShortUserName();
@@ -70,29 +69,17 @@ class TokenHelper {
           HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME,
           HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT,
           TimeUnit.MILLISECONDS);
-      long certificateGracePeriod = Duration.parse(
-          conf.get(HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATION,
-              HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATION_DEFAULT))
-          .toMillis();
-      if (expiryTime > certificateGracePeriod) {
-        throw new IllegalArgumentException("Certificate grace period " +
-            HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATION +
-            " should be greater than maximum block/container token lifetime " +
-            HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME);
-      }
 
       if (blockTokenEnabled) {
-        blockTokenMgr = new OzoneBlockTokenSecretManager(
-            securityConfig, expiryTime);
-        blockTokenMgr.start(certClient);
+        blockTokenMgr = new OzoneBlockTokenSecretManager(expiryTime,
+            secretKeyClient);
       } else {
         blockTokenMgr = null;
       }
 
       if (containerTokenEnabled) {
-        containerTokenMgr = new ContainerTokenSecretManager(
-            securityConfig, expiryTime);
-        containerTokenMgr.start(certClient);
+        containerTokenMgr = new ContainerTokenSecretManager(expiryTime,
+            secretKeyClient);
       } else {
         containerTokenMgr = null;
       }
@@ -103,23 +90,6 @@ class TokenHelper {
     }
   }
 
-  void stop() {
-    if (blockTokenMgr != null) {
-      try {
-        blockTokenMgr.stop();
-      } catch (IOException ignored) {
-        // no threads involved, cannot really happen
-      }
-    }
-    if (containerTokenMgr != null) {
-      try {
-        containerTokenMgr.stop();
-      } catch (IOException ignored) {
-        // no threads involved, cannot really happen
-      }
-    }
-  }
-
   Token<OzoneBlockTokenIdentifier> getBlockToken(BlockID blockID, long length) {
     return blockTokenMgr != null
         ? blockTokenMgr.generateToken(user, blockID, MODES, length)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index a1d159ef35..9e4207cf75 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -137,7 +138,8 @@ public class OzoneContainer {
    */
   public OzoneContainer(
       DatanodeDetails datanodeDetails, ConfigurationSource conf,
-      StateContext context, CertificateClient certClient) throws IOException {
+      StateContext context, CertificateClient certClient,
+      SecretKeyVerifierClient secretKeyClient) throws IOException {
     config = conf;
     this.datanodeDetails = datanodeDetails;
     this.context = context;
@@ -191,7 +193,8 @@ public class OzoneContainer {
 
     SecurityConfig secConf = new SecurityConfig(conf);
     hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
-        handlers, context, metrics, TokenVerifier.create(secConf, certClient));
+        handlers, context, metrics,
+        TokenVerifier.create(secConf, secretKeyClient));
 
     /*
      * ContainerController is the control plane
@@ -264,6 +267,16 @@ public class OzoneContainer {
         new AtomicReference<>(InitializingStatus.UNINITIALIZED);
   }
 
+  /**
+   * Shorthand constructor used for testing in non-secure context.
+   */
+  @VisibleForTesting
+  public OzoneContainer(
+      DatanodeDetails datanodeDetails, ConfigurationSource conf,
+      StateContext context) throws IOException {
+    this(datanodeDetails, conf, context, null, null);
+  }
+
   public GrpcTlsConfig getTlsClientConfig() {
     return tlsClientConfig;
   }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index 31d5000acb..9109be4627 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -107,7 +107,7 @@ public final class ContainerTestUtils {
     StateContext context = Mockito.mock(StateContext.class);
     Mockito.when(stateMachine.getDatanodeDetails()).thenReturn(datanodeDetails);
     Mockito.when(context.getParent()).thenReturn(stateMachine);
-    return new OzoneContainer(datanodeDetails, conf, context, null);
+    return new OzoneContainer(datanodeDetails, conf, context);
   }
 
   public static DatanodeDetails createDatanodeDetails() {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index f2faeaa3a3..944f6286c5 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -157,8 +157,7 @@ public class TestDatanodeStateMachine {
   public void testStartStopDatanodeStateMachine() throws IOException,
       InterruptedException, TimeoutException {
     try (DatanodeStateMachine stateMachine =
-        new DatanodeStateMachine(getNewDatanodeDetails(), conf, null, null,
-            null)) {
+        new DatanodeStateMachine(getNewDatanodeDetails(), conf)) {
       stateMachine.startDaemon();
       SCMConnectionManager connectionManager =
           stateMachine.getConnectionManager();
@@ -220,8 +219,7 @@ public class TestDatanodeStateMachine {
     datanodeDetails.setPort(port);
     ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
     try (DatanodeStateMachine stateMachine =
-             new DatanodeStateMachine(datanodeDetails, conf, null, null,
-                 null)) {
+             new DatanodeStateMachine(datanodeDetails, conf)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();
       Assertions.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -343,8 +341,7 @@ public class TestDatanodeStateMachine {
     datanodeDetails.setPort(port);
 
     try (DatanodeStateMachine stateMachine =
-             new DatanodeStateMachine(datanodeDetails, conf, null, null,
-                 null)) {
+             new DatanodeStateMachine(datanodeDetails, conf)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();
       Assertions.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -402,7 +399,7 @@ public class TestDatanodeStateMachine {
       perTestConf.setStrings(entry.getKey(), entry.getValue());
       LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
       try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
-          getNewDatanodeDetails(), perTestConf, null, null, null)) {
+          getNewDatanodeDetails(), perTestConf)) {
         DatanodeStateMachine.DatanodeStates currentState =
             stateMachine.getContext().getState();
         Assertions.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index cceed79390..f06eea2ec1 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -178,7 +178,7 @@ public class TestOzoneContainer {
     // loaded into the containerSet.
     // Also expected to initialize committed space for each volume.
     OzoneContainer ozoneContainer = new
-        OzoneContainer(datanodeDetails, conf, context, null);
+        OzoneContainer(datanodeDetails, conf, context);
 
     ContainerSet containerset = ozoneContainer.getContainerSet();
     assertEquals(numTestContainers, containerset.containerCount());
@@ -213,7 +213,7 @@ public class TestOzoneContainer {
     // loaded into the containerSet.
     // Also expected to initialize committed space for each volume.
     OzoneContainer ozoneContainer = new
-            OzoneContainer(datanodeDetails, conf, context, null);
+            OzoneContainer(datanodeDetails, conf, context);
     Assert.assertEquals(volumeSet.getVolumesList().size(),
             ozoneContainer.getNodeReport().getStorageReportList().size());
     Assert.assertEquals(3,
@@ -234,7 +234,7 @@ public class TestOzoneContainer {
     // loaded into the containerSet.
     // Also expected to initialize committed space for each volume.
     OzoneContainer ozoneContainer = new
-            OzoneContainer(datanodeDetails, conf, context, null);
+            OzoneContainer(datanodeDetails, conf, context);
     Assert.assertEquals(volumeSet.getVolumesList().size(),
             ozoneContainer.getNodeReport().getStorageReportList().size());
     Assert.assertEquals(1,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDataNodeStartupSlvLessThanMlv.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDataNodeStartupSlvLessThanMlv.java
index 306cdb77b3..82a9db0377 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDataNodeStartupSlvLessThanMlv.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDataNodeStartupSlvLessThanMlv.java
@@ -65,8 +65,7 @@ public class TestDataNodeStartupSlvLessThanMlv {
         HddsProtos.NodeType.DATANODE, mlv);
 
     try {
-      new DatanodeStateMachine(getNewDatanodeDetails(), conf, null,
-                   null, null);
+      new DatanodeStateMachine(getNewDatanodeDetails(), conf);
       Assert.fail("Expected IOException due to incorrect MLV on DataNode " +
           "creation.");
     } catch (IOException e) {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
index 7af39e5e7f..1d7c889dd0 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToSchemaV3.java
@@ -222,7 +222,7 @@ public class TestDatanodeUpgradeToSchemaV3 {
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     layoutStorage.initialize();
     dsm = new DatanodeStateMachine(
-        ContainerTestUtils.createDatanodeDetails(), conf, null, null, null);
+        ContainerTestUtils.createDatanodeDetails(), conf);
     HddsVolume dataVolume = (
         HddsVolume) dsm.getContainer().getVolumeSet().getVolumesList().get(0);
     // Format HddsVolume to mimic the real cluster upgrade situation
@@ -489,7 +489,7 @@ public class TestDatanodeUpgradeToSchemaV3 {
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion());
     layoutStorage.initialize();
     dsm = new DatanodeStateMachine(
-        ContainerTestUtils.createDatanodeDetails(), conf, null, null, null);
+        ContainerTestUtils.createDatanodeDetails(), conf);
     HddsVolume dataVolume = (
         HddsVolume) dsm.getContainer().getVolumeSet().getVolumesList().get(0);
     // Format HddsVolume to mimic the real cluster upgrade situation
@@ -588,8 +588,7 @@ public class TestDatanodeUpgradeToSchemaV3 {
 
     // Build and start the datanode.
     DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
-    DatanodeStateMachine newDsm = new DatanodeStateMachine(dd,
-        conf, null, null, null);
+    DatanodeStateMachine newDsm = new DatanodeStateMachine(dd, conf);
     int actualMlv = newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
     Assert.assertEquals(
         HDDSLayoutFeature.ERASURE_CODED_STORAGE_SUPPORT.layoutVersion(),
@@ -609,8 +608,7 @@ public class TestDatanodeUpgradeToSchemaV3 {
     dsm.close();
 
     // Start new datanode with the same configuration.
-    dsm = new DatanodeStateMachine(dd,
-        conf, null, null, null);
+    dsm = new DatanodeStateMachine(dd, conf);
     int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
     if (exactMatch) {
       Assert.assertEquals(expectedMlv, mlv);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
index 3ff347fbdf..79f55001c7 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/upgrade/TestDatanodeUpgradeToScmHA.java
@@ -516,9 +516,7 @@ public class TestDatanodeUpgradeToScmHA {
 
     // Build and start the datanode.
     DatanodeDetails dd = ContainerTestUtils.createDatanodeDetails();
-    DatanodeStateMachine newDsm = new DatanodeStateMachine(dd,
-        conf, null, null,
-        null);
+    DatanodeStateMachine newDsm = new DatanodeStateMachine(dd, conf);
     int actualMlv = newDsm.getLayoutVersionManager().getMetadataLayoutVersion();
     Assert.assertEquals(HDDSLayoutFeature.INITIAL_VERSION.layoutVersion(),
         actualMlv);
@@ -534,9 +532,7 @@ public class TestDatanodeUpgradeToScmHA {
     dsm.close();
 
     // Start new datanode with the same configuration.
-    dsm = new DatanodeStateMachine(dd,
-        conf, null, null,
-        null);
+    dsm = new DatanodeStateMachine(dd, conf);
     int mlv = dsm.getLayoutVersionManager().getMetadataLayoutVersion();
     if (exactMatch) {
       Assert.assertEquals(expectedMlv, mlv);
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java
new file mode 100644
index 0000000000..d77fee778e
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
+
+/**
+ * A composition of {@link DefaultSecretKeySignerClient} and
+ * {@link DefaultSecretKeyVerifierClient} for components need both APIs.
+ */
+public class DefaultSecretKeyClient implements SecretKeyClient {
+  private final SecretKeySignerClient signerClientDelegate;
+  private final SecretKeyVerifierClient verifierClientDelegate;
+
+
+  DefaultSecretKeyClient(SecretKeySignerClient signerClientDelegate,
+                         SecretKeyVerifierClient verifierClientDelegate) {
+    this.signerClientDelegate = signerClientDelegate;
+    this.verifierClientDelegate = verifierClientDelegate;
+  }
+
+
+  @Override
+  public ManagedSecretKey getCurrentSecretKey() {
+    return signerClientDelegate.getCurrentSecretKey();
+  }
+
+  @Override
+  public void start(ConfigurationSource conf) throws IOException {
+    signerClientDelegate.start(conf);
+  }
+
+  @Override
+  public void stop() {
+    signerClientDelegate.stop();
+  }
+
+  @Override
+  public ManagedSecretKey getSecretKey(UUID id) throws SCMSecurityException {
+    return verifierClientDelegate.getSecretKey(id);
+  }
+
+  public static SecretKeyClient create(ConfigurationSource conf)
+      throws IOException {
+    SCMSecurityProtocol securityProtocol = getScmSecurityClient(conf);
+    SecretKeySignerClient singerClient =
+        new DefaultSecretKeySignerClient(securityProtocol);
+    SecretKeyVerifierClient verifierClient =
+        new DefaultSecretKeyVerifierClient(securityProtocol, conf);
+    return new DefaultSecretKeyClient(singerClient, verifierClient);
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
new file mode 100644
index 0000000000..a1056f9139
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
+
+/**
+ * Default implementation of {@link SecretKeySignerClient} that fetches
+ * secret keys from SCM. This client implements a background thread that
+ * periodically check and get the latest current secret key from SCM.
+ */
+public class DefaultSecretKeySignerClient implements SecretKeySignerClient {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DefaultSecretKeySignerClient.class);
+
+  private final SCMSecurityProtocol scmSecurityProtocol;
+  private final AtomicReference<ManagedSecretKey> cache =
+      new AtomicReference<>();
+  private ScheduledExecutorService executorService;
+
+  public DefaultSecretKeySignerClient(
+      SCMSecurityProtocol scmSecurityProtocol) {
+    this.scmSecurityProtocol = scmSecurityProtocol;
+  }
+
+  @Override
+  public ManagedSecretKey getCurrentSecretKey() {
+    return requireNonNull(cache.get(),
+        "SecretKey client must have been initialized already.");
+  }
+
+  @Override
+  public void start(ConfigurationSource conf) throws IOException {
+    final ManagedSecretKey initialKey =
+        scmSecurityProtocol.getCurrentSecretKey();
+    LOG.info("Initial secret key fetched from SCM: {}.", initialKey);
+    cache.set(initialKey);
+    scheduleSecretKeyPoller(conf, initialKey.getCreationTime());
+  }
+
+  @Override
+  public void stop() {
+    executorService.shutdown();
+    try {
+      if (executorService.awaitTermination(1, TimeUnit.MINUTES)) {
+        executorService.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while shutting down executor service.", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void scheduleSecretKeyPoller(ConfigurationSource conf,
+                                       Instant initialCreation) {
+    Duration rotateDuration = SecretKeyConfig.parseRotateDuration(conf);
+    Instant nextRotate = initialCreation.plus(rotateDuration);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("SecretKeyPoller")
+        .setDaemon(true)
+        .build();
+    executorService = Executors.newScheduledThreadPool(1, threadFactory);
+    Duration interval = SecretKeyConfig.parseRotateCheckDuration(conf);
+    Duration initialDelay = Duration.between(Instant.now(), nextRotate);
+
+    LOG.info("Scheduling SecretKeyPoller with initial delay of {} " +
+        "and interval of {}", initialDelay, interval);
+    executorService.scheduleAtFixedRate(() -> checkAndRefresh(rotateDuration),
+        initialDelay.toMillis(), interval.toMillis(),
+        TimeUnit.MILLISECONDS);
+  }
+
+  private void checkAndRefresh(Duration rotateDuration) {
+    ManagedSecretKey current = cache.get();
+    Instant nextRotate = current.getCreationTime().plus(rotateDuration);
+    // when the current key passes the rotation cycle, fetch the next one
+    // from SCM.
+    if (nextRotate.isBefore(Instant.now())) {
+      try {
+        ManagedSecretKey newKey = scmSecurityProtocol.getCurrentSecretKey();
+        if (!newKey.equals(current)) {
+          cache.set(newKey);
+          LOG.info("New secret key fetched from SCM: {}.", newKey);
+        }
+      } catch (IOException e) {
+        // TODO: emic failure metrics.
+        throw new UncheckedIOException(
+            "Error fetching current key from SCM", e);
+      }
+    }
+  }
+
+  public static DefaultSecretKeySignerClient create(ConfigurationSource conf)
+      throws IOException {
+    SCMSecurityProtocol securityProtocol = getScmSecurityClient(conf);
+    return new DefaultSecretKeySignerClient(securityProtocol);
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java
new file mode 100644
index 0000000000..56478793cb
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.symmetric;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdds.security.symmetric.SecretKeyConfig.parseExpiryDuration;
+import static org.apache.hadoop.hdds.security.symmetric.SecretKeyConfig.parseRotateDuration;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
+
+/**
+ * Default implementation of {@link SecretKeyVerifierClient} that fetches
+ * SecretKeys remotely via {@link SCMSecurityProtocol}.
+ */
+public class DefaultSecretKeyVerifierClient implements SecretKeyVerifierClient {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DefaultSecretKeyVerifierClient.class);
+
+  private final LoadingCache<UUID, ManagedSecretKey> cache;
+
+  DefaultSecretKeyVerifierClient(SCMSecurityProtocol scmSecurityProtocol,
+                                 ConfigurationSource conf) {
+    Duration expiryDuration = parseExpiryDuration(conf);
+    Duration rotateDuration = parseRotateDuration(conf);
+    long cacheSize = expiryDuration.toMillis() / rotateDuration.toMillis() + 1;
+
+    CacheLoader<UUID, ManagedSecretKey> loader =
+        new CacheLoader<UUID, ManagedSecretKey>() {
+          @Override
+          public ManagedSecretKey load(UUID id) throws Exception {
+            ManagedSecretKey secretKey = scmSecurityProtocol.getSecretKey(id);
+            LOG.info("Secret key fetched from SCM: {}", secretKey);
+            return secretKey;
+          }
+        };
+
+    LOG.info("Initializing secret key cache with size {}, TTL {}",
+        cacheSize, expiryDuration);
+    cache = CacheBuilder.newBuilder()
+        .maximumSize(cacheSize)
+        .expireAfterWrite(expiryDuration.toMillis(), TimeUnit.MILLISECONDS)
+        .recordStats()
+        .build(loader);
+  }
+
+  @Override
+  public ManagedSecretKey getSecretKey(UUID id) throws SCMSecurityException {
+    try {
+      return cache.get(id);
+    } catch (ExecutionException e) {
+      // handle cache load exception.
+      if (e.getCause() instanceof IOException) {
+        IOException cause = (IOException) e.getCause();
+        if (cause instanceof SCMSecurityException) {
+          throw (SCMSecurityException) cause;
+        } else {
+          throw new SCMSecurityException(
+              "Error fetching secret key " + id + " from SCM", cause);
+        }
+      }
+      throw new IllegalStateException("Unexpected exception fetching secret " +
+          "key " + id + " from SCM", e.getCause());
+    }
+  }
+
+  public static DefaultSecretKeyVerifierClient create(ConfigurationSource conf)
+      throws IOException {
+    SCMSecurityProtocol securityProtocol = getScmSecurityClient(conf);
+    return new DefaultSecretKeyVerifierClient(securityProtocol, conf);
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
index 3128265e9a..78e4fc0b90 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
@@ -20,11 +20,15 @@ package org.apache.hadoop.hdds.security.symmetric;
 
 import com.google.protobuf.ByteString;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ProtobufUtils;
 
+import javax.crypto.Mac;
 import javax.crypto.SecretKey;
 import javax.crypto.spec.SecretKeySpec;
-import java.io.Serializable;
+import java.security.InvalidKeyException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.time.Instant;
 import java.util.UUID;
 
@@ -32,11 +36,12 @@ import java.util.UUID;
  * Enclosed a symmetric {@link SecretKey} with additional data for life-cycle
  * management.
  */
-public final class ManagedSecretKey implements Serializable {
+public final class ManagedSecretKey {
   private final UUID id;
   private final Instant creationTime;
   private final Instant expiryTime;
   private final SecretKey secretKey;
+  private final ThreadLocal<Mac> macInstances;
 
   public ManagedSecretKey(UUID id,
                           Instant creationTime,
@@ -46,6 +51,16 @@ public final class ManagedSecretKey implements Serializable {
     this.creationTime = creationTime;
     this.expiryTime = expiryTime;
     this.secretKey = secretKey;
+
+    // This help reuse Mac instances for the same thread.
+    macInstances = ThreadLocal.withInitial(() -> {
+      try {
+        return Mac.getInstance(secretKey.getAlgorithm());
+      } catch (NoSuchAlgorithmException e) {
+        throw new IllegalArgumentException(
+            "Invalid algorithm " + secretKey.getAlgorithm(), e);
+      }
+    });
   }
 
   public boolean isExpired() {
@@ -88,6 +103,29 @@ public final class ManagedSecretKey implements Serializable {
         + creationTime + ", expire at: " + expiryTime + ")";
   }
 
+  public byte[] sign(byte[] data) {
+    try {
+      Mac mac = macInstances.get();
+      mac.init(secretKey);
+      return mac.doFinal(data);
+    } catch (InvalidKeyException e) {
+      throw new IllegalArgumentException("Invalid key to HMAC computation", e);
+    }
+  }
+
+  public byte[] sign(TokenIdentifier tokenId) {
+    return sign(tokenId.getBytes());
+  }
+
+  public boolean isValidSignature(byte[] data, byte[] signature) {
+    byte[] expectedSignature = sign(data);
+    return MessageDigest.isEqual(expectedSignature, signature);
+  }
+
+  public boolean isValidSignature(TokenIdentifier tokenId, byte[] signature) {
+    return isValidSignature(tokenId.getBytes(), signature);
+  }
+
   /**
    * @return the protobuf message to deserialize this object.
    */
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyClient.java
new file mode 100644
index 0000000000..a71b14dc3f
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyClient.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.symmetric;
+
+/**
+ * Composite client for those components that need to perform both signing
+ * and verifying.
+ */
+public interface SecretKeyClient extends SecretKeySignerClient,
+    SecretKeyVerifierClient {
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyConfig.java
index f2a9181051..a833ba0137 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyConfig.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyConfig.java
@@ -58,23 +58,33 @@ public class SecretKeyConfig {
         HDDS_SECRET_KEY_FILE_DEFAULT);
     localSecretKeyFile = Paths.get(metadataDir, component, keyDir, fileName);
 
-    long rotateDurationInMs = conf.getTimeDuration(
-        HDDS_SECRET_KEY_ROTATE_DURATION,
-        HDDS_SECRET_KEY_ROTATE_DURATION_DEFAULT, TimeUnit.MILLISECONDS);
-    this.rotateDuration = Duration.ofMillis(rotateDurationInMs);
+    this.rotateDuration = parseRotateDuration(conf);
+    this.expiryDuration = parseExpiryDuration(conf);
+    this.rotationCheckDuration = parseRotateCheckDuration(conf);
+
+    this.algorithm = conf.get(HDDS_SECRET_KEY_ALGORITHM,
+        HDDS_SECRET_KEY_ALGORITHM_DEFAULT);
+  }
 
+  public static Duration parseExpiryDuration(ConfigurationSource conf) {
     long expiryDurationInMs = conf.getTimeDuration(
         HDDS_SECRET_KEY_EXPIRY_DURATION,
         HDDS_SECRET_KEY_EXPIRY_DURATION_DEFAULT, TimeUnit.MILLISECONDS);
-    this.expiryDuration = Duration.ofMillis(expiryDurationInMs);
+    return Duration.ofMillis(expiryDurationInMs);
+  }
 
-    this.algorithm = conf.get(HDDS_SECRET_KEY_ALGORITHM,
-        HDDS_SECRET_KEY_ALGORITHM_DEFAULT);
+  public static Duration parseRotateDuration(ConfigurationSource conf) {
+    long rotateDurationInMs = conf.getTimeDuration(
+        HDDS_SECRET_KEY_ROTATE_DURATION,
+        HDDS_SECRET_KEY_ROTATE_DURATION_DEFAULT, TimeUnit.MILLISECONDS);
+    return Duration.ofMillis(rotateDurationInMs);
+  }
 
+  public static Duration parseRotateCheckDuration(ConfigurationSource conf) {
     long rotationCheckInMs = conf.getTimeDuration(
         HDDS_SECRET_KEY_ROTATE_CHECK_DURATION,
         HDDS_SECRET_KEY_ROTATE_CHECK_DURATION_DEFAULT, TimeUnit.MILLISECONDS);
-    this.rotationCheckDuration = Duration.ofMillis(rotationCheckInMs);
+    return Duration.ofMillis(rotationCheckInMs);
   }
 
   public Path getLocalSecretKeyFile() {
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
index cb529e10d1..f7a481cc05 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManager.java
@@ -37,7 +37,7 @@ import static java.util.stream.Collectors.toList;
  * This component manages symmetric SecretKey life-cycle, including generation,
  * rotation and destruction.
  */
-public class SecretKeyManager {
+public class SecretKeyManager implements SecretKeyClient {
   private static final Logger LOG =
       LoggerFactory.getLogger(SecretKeyManager.class);
 
@@ -129,11 +129,13 @@ public class SecretKeyManager {
     return false;
   }
 
-  public ManagedSecretKey getCurrentKey() {
+  @Override
+  public ManagedSecretKey getCurrentSecretKey() {
     return state.getCurrentKey();
   }
 
-  public ManagedSecretKey getKey(UUID id) {
+  @Override
+  public ManagedSecretKey getSecretKey(UUID id) {
     return state.getKey(id);
   }
 
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeySignerClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeySignerClient.java
new file mode 100644
index 0000000000..d05786db5c
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeySignerClient.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+
+import java.io.IOException;
+
+/**
+ * Define the client-side API that the token signers (like OM) uses to retrieve
+ * the secret key to sign data.
+ */
+public interface SecretKeySignerClient {
+  ManagedSecretKey getCurrentSecretKey();
+
+  /**
+   * This is where the actual implementation can  prefetch the current
+   * secret key or initialize ay necessary resources, e.g. cache or executors.
+   */
+  default void start(ConfigurationSource conf) throws IOException {
+  }
+
+  /**
+   * Give a chance for the implementation to clean up acquired resources.
+   */
+  default void stop() {
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
index b1d66e1186..727b005d2b 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyStateImpl.java
@@ -127,5 +127,4 @@ public final class SecretKeyStateImpl implements SecretKeyState {
       lock.writeLock().unlock();
     }
   }
-
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyVerifierClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyVerifierClient.java
new file mode 100644
index 0000000000..59f49f72f1
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyVerifierClient.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+
+import java.util.UUID;
+
+/**
+ * Define the client-side API that the token verifiers (or datanodes) use to
+ * retrieve the relevant secret key to validate token authority.
+ */
+public interface SecretKeyVerifierClient {
+  ManagedSecretKey getSecretKey(UUID id) throws SCMSecurityException;
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
index 996ed7ae68..c9999d253b 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +54,9 @@ public class BlockTokenVerifier extends
     return String.valueOf(blockID);
   }
 
-  public BlockTokenVerifier(SecurityConfig conf, CertificateClient caClient) {
-    super(conf, caClient);
+  public BlockTokenVerifier(SecurityConfig conf,
+                            SecretKeyVerifierClient secretKeyClient) {
+    super(conf, secretKeyClient);
   }
 
   @Override
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenSecretManager.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenSecretManager.java
index e9f37f3de1..4cee87696f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenSecretManager.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenSecretManager.java
@@ -20,11 +20,9 @@ package org.apache.hadoop.hdds.security.token;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -38,17 +36,15 @@ public class ContainerTokenSecretManager
     extends ShortLivedTokenSecretManager<ContainerTokenIdentifier>
     implements ContainerTokenGenerator {
 
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerTokenSecretManager.class);
-
-  public ContainerTokenSecretManager(SecurityConfig conf, long tokenLifetime) {
-    super(conf, tokenLifetime, LOG);
+  public ContainerTokenSecretManager(long tokenLifetime,
+                                     SecretKeySignerClient secretKeyClient) {
+    super(tokenLifetime, secretKeyClient);
   }
 
   public ContainerTokenIdentifier createIdentifier(String user,
       ContainerID containerID) {
     return new ContainerTokenIdentifier(user, containerID,
-        getCertSerialId(), getTokenExpiryTime());
+        getTokenExpiryTime());
   }
 
   @Override
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenVerifier.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenVerifier.java
index 941160a042..7e4d186c32 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenVerifier.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ContainerTokenVerifier.java
@@ -21,16 +21,16 @@ import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 
 /** Verifier for container tokens. */
 public class ContainerTokenVerifier extends
     ShortLivedTokenVerifier<ContainerTokenIdentifier> {
 
   public ContainerTokenVerifier(SecurityConfig conf,
-      CertificateClient caClient) {
-    super(conf, caClient);
+      SecretKeyVerifierClient secretKeyClient) {
+    super(conf, secretKeyClient);
   }
 
   @Override
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSecretManager.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSecretManager.java
index 3dc7a395a1..1192377e19 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSecretManager.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/OzoneBlockTokenSecretManager.java
@@ -21,11 +21,10 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,24 +39,19 @@ import java.util.Set;
 @InterfaceStability.Unstable
 public class OzoneBlockTokenSecretManager extends
     ShortLivedTokenSecretManager<OzoneBlockTokenIdentifier> {
-
   private static final Logger LOG = LoggerFactory
       .getLogger(OzoneBlockTokenSecretManager.class);
 
-  public OzoneBlockTokenSecretManager(SecurityConfig conf, long tokenLifetime) {
-    super(conf, tokenLifetime, LOG);
-  }
-
-  @Override
-  public OzoneBlockTokenIdentifier createIdentifier() {
-    throw new SecurityException("Ozone block token can't be created "
-        + "without owner and access mode information.");
+  public OzoneBlockTokenSecretManager(long tokenLifetime,
+                                      SecretKeySignerClient passwordManager) {
+    super(tokenLifetime, passwordManager);
   }
 
   public OzoneBlockTokenIdentifier createIdentifier(String owner,
       BlockID blockID, Set<AccessModeProto> modes, long maxLength) {
     return new OzoneBlockTokenIdentifier(owner, blockID, modes,
-        getTokenExpiryTime().toEpochMilli(), getCertSerialId(), maxLength);
+        getTokenExpiryTime().toEpochMilli(),
+        maxLength);
   }
 
   /**
@@ -73,8 +67,9 @@ public class OzoneBlockTokenSecretManager extends
       LOG.info("Issued delegation token -> expiryTime:{}, tokenId:{}",
           Instant.ofEpochMilli(expiryTime), tokenIdentifier);
     }
+    byte[] password = createPassword(tokenIdentifier);
     return new Token<>(tokenIdentifier.getBytes(),
-        createPassword(tokenIdentifier), tokenIdentifier.getKind(),
+        password, tokenIdentifier.getKind(),
         new Text(tokenIdentifier.getService()));
   }
 
@@ -88,49 +83,4 @@ public class OzoneBlockTokenSecretManager extends
     return generateToken(userID, blockId, modes, maxLength);
   }
 
-  @Override
-  public byte[] retrievePassword(OzoneBlockTokenIdentifier identifier)
-      throws InvalidToken {
-    validateToken(identifier);
-    return createPassword(identifier);
-  }
-
-  @Override
-  public long renewToken(Token<OzoneBlockTokenIdentifier> token,
-      String renewer) {
-    throw new UnsupportedOperationException("Renew token operation is not " +
-        "supported for ozone block tokens.");
-  }
-
-  @Override
-  public OzoneBlockTokenIdentifier cancelToken(Token<OzoneBlockTokenIdentifier>
-      token, String canceller) {
-    throw new UnsupportedOperationException("Cancel token operation is not " +
-        "supported for ozone block tokens.");
-  }
-
-  /**
-   * Find the OzoneBlockTokenInfo for the given token id, and verify that if the
-   * token is not expired.
-   */
-  @Override
-  public boolean validateToken(OzoneBlockTokenIdentifier identifier)
-      throws InvalidToken {
-    long now = Time.now();
-    if (identifier.getExpiryDate() < now) {
-      throw new InvalidToken("token " + formatTokenId(identifier) + " is " +
-          "expired, current time: " + Time.formatTime(now) +
-          " expiry time: " + identifier.getExpiryDate());
-    }
-    return true;
-  }
-
-  /**
-   * Validates if given hash is valid.
-   */
-  public boolean verifySignature(OzoneBlockTokenIdentifier identifier,
-      byte[] password) {
-    throw new UnsupportedOperationException("This operation is not " +
-        "supported for block tokens.");
-  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenSecretManager.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenSecretManager.java
index 966cf1cf5f..981ea77a41 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenSecretManager.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenSecretManager.java
@@ -19,11 +19,10 @@ package org.apache.hadoop.hdds.security.token;
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
-import org.apache.hadoop.hdds.security.OzoneSecretManager;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
-import org.slf4j.Logger;
 
 import java.time.Instant;
 
@@ -33,54 +32,21 @@ import java.time.Instant;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public abstract class
-    ShortLivedTokenSecretManager<T extends ShortLivedTokenIdentifier>
-    extends OzoneSecretManager<T> {
+public abstract class ShortLivedTokenSecretManager
+    <T extends ShortLivedTokenIdentifier> {
+  private final long tokenMaxLifetime;
+  private SecretKeySignerClient secretKeyClient;
 
-  private static final Text SERVICE = new Text("HDDS_SERVICE");
-
-  protected ShortLivedTokenSecretManager(SecurityConfig conf,
-      long tokenLifetime, Logger logger) {
-    super(conf, tokenLifetime, tokenLifetime, SERVICE, logger);
-  }
-
-  @Override
-  public T createIdentifier() {
-    throw new SecurityException("Short-lived token requires additional " +
-        "information (owner, etc.).");
-  }
-
-  @Override
-  public long renewToken(Token<T> token, String renewer) {
-    throw new UnsupportedOperationException("Renew token operation is not " +
-        "supported for short-lived tokens.");
-  }
-
-  @Override
-  public T cancelToken(Token<T> token, String canceller) {
-    throw new UnsupportedOperationException("Cancel token operation is not " +
-        "supported for short-lived tokens.");
-  }
-
-  @Override
-  public byte[] retrievePassword(T identifier) throws InvalidToken {
-    validateToken(identifier);
-    return createPassword(identifier);
+  protected ShortLivedTokenSecretManager(
+      long tokenLifetime, SecretKeySignerClient secretKeyClient) {
+    this.tokenMaxLifetime = tokenLifetime;
+    this.secretKeyClient = secretKeyClient;
   }
 
-  /**
-   * Find the OzoneBlockTokenInfo for the given token id, and verify that if the
-   * token is not expired.
-   */
-  protected boolean validateToken(T identifier) throws InvalidToken {
-    Instant now = Instant.now();
-    if (identifier.isExpired(now)) {
-      throw new InvalidToken("token " + formatTokenId(identifier) + " is " +
-          "expired, current time: " + now +
-          " expiry time: " + identifier.getExpiry());
-    }
-
-    return true;
+  protected byte[] createPassword(T tokenId) {
+    ManagedSecretKey secretKey = secretKeyClient.getCurrentSecretKey();
+    tokenId.setSecretKeyId(secretKey.getId());
+    return secretKey.sign(tokenId);
   }
 
   /**
@@ -89,12 +55,21 @@ public abstract class
    * @return Expiry time.
    */
   protected Instant getTokenExpiryTime() {
-    return Instant.now().plusMillis(getTokenMaxLifetime());
+    return Instant.now().plusMillis(tokenMaxLifetime);
   }
 
   public Token<T> generateToken(T tokenIdentifier) {
+    byte[] password = createPassword(tokenIdentifier);
     return new Token<>(tokenIdentifier.getBytes(),
-        createPassword(tokenIdentifier), tokenIdentifier.getKind(),
+        password, tokenIdentifier.getKind(),
         new Text(tokenIdentifier.getService()));
   }
+
+  /**
+   * Allows integration-test to inject a custom implementation of
+   * SecretKeyClient to test without fully setting up a working secure cluster.
+   */
+  public void setSecretKeyClient(SecretKeySignerClient client) {
+    this.secretKeyClient = client;
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenVerifier.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenVerifier.java
index 92b643e285..4731f9149c 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenVerifier.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/ShortLivedTokenVerifier.java
@@ -20,17 +20,15 @@ package org.apache.hadoop.hdds.security.token;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.security.cert.CertificateExpiredException;
-import java.security.cert.CertificateNotYetValidException;
-import java.security.cert.X509Certificate;
 import java.time.Instant;
 import java.util.Objects;
 
@@ -42,13 +40,13 @@ public abstract class
     ShortLivedTokenVerifier<T extends ShortLivedTokenIdentifier>
     implements TokenVerifier {
 
-  private final CertificateClient caClient;
   private final SecurityConfig conf;
+  private final SecretKeyVerifierClient secretKeyClient;
 
   protected ShortLivedTokenVerifier(SecurityConfig conf,
-      CertificateClient caClient) {
+      SecretKeyVerifierClient secretKeyClient) {
     this.conf = conf;
-    this.caClient = caClient;
+    this.secretKeyClient = secretKeyClient;
   }
 
   /** Whether the specific kind of token is required for {@code cmdType}. */
@@ -75,11 +73,6 @@ public abstract class
       return;
     }
 
-    if (caClient == null) {
-      throw new SCMSecurityException("Certificate client not available " +
-          "to validate token");
-    }
-
     T tokenId = createTokenIdentifier();
     try {
       tokenId.readFields(new DataInputStream(new ByteArrayInputStream(
@@ -88,31 +81,9 @@ public abstract class
       throw new BlockTokenException("Failed to decode token : " + token);
     }
 
-    UserGroupInformation tokenUser = tokenId.getUser();
-    X509Certificate signerCert =
-        caClient.getCertificate(tokenId.getCertSerialId());
-
-    if (signerCert == null) {
-      throw new BlockTokenException("Can't find signer certificate " +
-          "(CertSerialId: " + tokenId.getCertSerialId() +
-          ") of the token for user: " + tokenUser);
-    }
-
-    try {
-      signerCert.checkValidity();
-    } catch (CertificateExpiredException exExp) {
-      throw new BlockTokenException("Token can't be verified due to " +
-          "expired certificate " + tokenId.getCertSerialId());
-    } catch (CertificateNotYetValidException exNyv) {
-      throw new BlockTokenException("Token can't be verified due to " +
-          "not yet valid certificate " + tokenId.getCertSerialId());
-    }
-
-    if (!caClient.verifySignature(tokenId.getBytes(), token.getPassword(),
-        signerCert)) {
-      throw new BlockTokenException("Invalid token for user: " + tokenUser);
-    }
+    verifyTokenPassword(tokenId, token.getPassword());
 
+    UserGroupInformation tokenUser = tokenId.getUser();
     // check expiration
     if (tokenId.isExpired(Instant.now())) {
       throw new BlockTokenException("Expired token for user: " + tokenUser);
@@ -132,4 +103,27 @@ public abstract class
   protected SecurityConfig getConf() {
     return conf;
   }
+
+  private void verifyTokenPassword(
+      ShortLivedTokenIdentifier tokenId, byte[] password)
+      throws SCMSecurityException {
+
+    ManagedSecretKey secretKey = secretKeyClient.getSecretKey(
+        tokenId.getSecretKeyId());
+    if (secretKey == null) {
+      throw new BlockTokenException("Can't find the signer secret key " +
+          tokenId.getSecretKeyId() + " of the token for user: " +
+          tokenId.getUser());
+    }
+
+    if (secretKey.isExpired()) {
+      throw new BlockTokenException("Token can't be verified due to " +
+          "expired secret key " + tokenId.getSecretKeyId());
+    }
+
+    if (!secretKey.isValidSignature(tokenId, password)) {
+      throw new BlockTokenException("Invalid token for user: " +
+          tokenId.getUser());
+    }
+  }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java
index dbf79d5482..3301b68fcc 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/token/TokenVerifier.java
@@ -23,8 +23,8 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.security.token.Token;
 
 import java.io.IOException;
@@ -73,15 +73,14 @@ public interface TokenVerifier {
 
   /** Create appropriate token verifier based on the configuration. */
   static TokenVerifier create(SecurityConfig conf,
-      CertificateClient certClient) {
-
+      SecretKeyVerifierClient secretKeyClient) throws IOException {
     if (!conf.isBlockTokenEnabled() && !conf.isContainerTokenEnabled()) {
       return new NoopTokenVerifier();
     }
 
     List<TokenVerifier> list = new LinkedList<>();
-    list.add(new BlockTokenVerifier(conf, certClient));
-    list.add(new ContainerTokenVerifier(conf, certClient));
+    list.add(new BlockTokenVerifier(conf, secretKeyClient));
+    list.add(new ContainerTokenVerifier(conf, secretKeyClient));
     return new CompositeTokenVerifier(list);
   }
 }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKeyTest.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKeyTest.java
new file mode 100644
index 0000000000..6db83186ac
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKeyTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.security.symmetric;
+
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.junit.jupiter.api.Test;
+
+import static com.google.common.collect.ImmutableSet.of;
+import static java.time.Duration.ofDays;
+import static java.time.Instant.now;
+import static org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey.fromProtobuf;
+import static org.apache.hadoop.hdds.security.symmetric.SecretKeyTestUtil.generateHmac;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Simple test cases for {@link ManagedSecretKey}.
+ */
+public class ManagedSecretKeyTest {
+
+  @Test
+  public void testSignAndVerifySuccess() throws Exception {
+    // Data can be signed and verified by same key.
+    byte[] data = RandomUtils.nextBytes(100);
+    ManagedSecretKey secretKey = generateHmac(now(), ofDays(1));
+    byte[] signature = secretKey.sign(data);
+    assertTrue(secretKey.isValidSignature(data, signature));
+
+    // Data can be signed and verified by same key transferred via network.
+    ManagedSecretKey transferredKey = fromProtobuf(secretKey.toProtobuf());
+    assertTrue(transferredKey.isValidSignature(data, signature));
+
+    // Token can be sign and verified by the same key.
+    OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier("owner",
+        new BlockID(1L, 1L), of(AccessModeProto.READ), 0L, 1L);
+    tokenId.setSecretKeyId(secretKey.getId());
+
+    signature = secretKey.sign(tokenId);
+    assertTrue(secretKey.isValidSignature(tokenId, signature));
+
+    // Token can be signed and verified by same key transferred via network.
+    assertTrue(transferredKey.isValidSignature(tokenId, signature));
+  }
+
+  @Test
+  public void testVerifyFailure() throws Exception {
+    byte[] data = RandomUtils.nextBytes(100);
+    ManagedSecretKey secretKey = generateHmac(now(), ofDays(1));
+    // random signature is not valid.
+    assertFalse(secretKey.isValidSignature(data, RandomUtils.nextBytes(100)));
+
+    // Data sign by one key can't be verified by another key.
+    byte[] signature = secretKey.sign(data);
+    ManagedSecretKey secretKey1 = generateHmac(now(), ofDays(1));
+    assertFalse(secretKey1.isValidSignature(data, signature));
+  }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManagerTest.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManagerTest.java
index 053148e28d..e7fd24082c 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManagerTest.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyManagerTest.java
@@ -26,13 +26,10 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import javax.crypto.KeyGenerator;
-import javax.crypto.SecretKey;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Stream;
 
@@ -203,13 +200,7 @@ public class SecretKeyManagerTest {
 
   private static ManagedSecretKey generateKey(Instant creationTime)
       throws Exception {
-    KeyGenerator keyGen = KeyGenerator.getInstance(ALGORITHM);
-    SecretKey secretKey = keyGen.generateKey();
-    return new ManagedSecretKey(
-        UUID.randomUUID(),
-        creationTime,
-        creationTime.plus(VALIDITY_DURATION),
-        secretKey
-    );
+    return SecretKeyTestUtil.generateKey(ALGORITHM, creationTime,
+        VALIDITY_DURATION);
   }
 }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyTestUtil.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyTestUtil.java
new file mode 100644
index 0000000000..272bedad54
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/symmetric/SecretKeyTestUtil.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.security.symmetric;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.UUID;
+
+/**
+ * Contains utility to test secret key logic.
+ */
+public final class SecretKeyTestUtil {
+  private SecretKeyTestUtil() {
+  }
+
+  public static ManagedSecretKey generateKey(
+      String algorithm, Instant creationTime, Duration validDuration)
+      throws NoSuchAlgorithmException {
+    KeyGenerator keyGen = KeyGenerator.getInstance(algorithm);
+    SecretKey secretKey = keyGen.generateKey();
+    return new ManagedSecretKey(
+        UUID.randomUUID(),
+        creationTime,
+        creationTime.plus(validDuration),
+        secretKey
+    );
+  }
+
+  public static ManagedSecretKey generateHmac(
+      Instant creationTime, Duration validDuration)
+      throws NoSuchAlgorithmException {
+    return generateKey("HmacSHA256", creationTime, validDuration);
+  }
+}
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestBlockTokenVerifier.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestBlockTokenVerifier.java
index c108a2670d..91825b09cc 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestBlockTokenVerifier.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestBlockTokenVerifier.java
@@ -23,8 +23,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 
 import java.io.IOException;
@@ -46,8 +46,8 @@ public class TestBlockTokenVerifier
 
   @Override
   protected TokenVerifier newTestSubject(SecurityConfig secConf,
-      CertificateClient caClient) {
-    return new BlockTokenVerifier(secConf, caClient);
+      SecretKeyVerifierClient secretKeyClient) {
+    return new BlockTokenVerifier(secConf, secretKeyClient);
   }
 
   @Override
@@ -67,10 +67,12 @@ public class TestBlockTokenVerifier
 
   @Override
   protected OzoneBlockTokenIdentifier newTokenId() {
-    return new OzoneBlockTokenIdentifier("any user",
-        new BlockID(1, 0),
-        EnumSet.allOf(AccessModeProto.class),
-        Instant.now().plusSeconds(3600).toEpochMilli(),
-        CERT_ID, 100);
+    OzoneBlockTokenIdentifier tokenId =
+        new OzoneBlockTokenIdentifier("any user",
+            new BlockID(1, 0),
+            EnumSet.allOf(AccessModeProto.class),
+            Instant.now().plusSeconds(3600).toEpochMilli(), 100);
+    tokenId.setSecretKeyId(SECRET_KEY_ID);
+    return tokenId;
   }
 }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestContainerTokenVerifier.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestContainerTokenVerifier.java
index 25616a62ec..1704226e5a 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestContainerTokenVerifier.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestContainerTokenVerifier.java
@@ -23,8 +23,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 
 import java.io.IOException;
 import java.time.Instant;
@@ -48,8 +48,8 @@ public class TestContainerTokenVerifier
 
   @Override
   protected TokenVerifier newTestSubject(SecurityConfig secConf,
-      CertificateClient caClient) {
-    return new ContainerTokenVerifier(secConf, caClient);
+      SecretKeyVerifierClient secretKeyClient) {
+    return new ContainerTokenVerifier(secConf, secretKeyClient);
   }
 
   @Override
@@ -69,8 +69,10 @@ public class TestContainerTokenVerifier
 
   @Override
   protected ContainerTokenIdentifier newTokenId() {
-    return new ContainerTokenIdentifier("any user",
-        ContainerID.valueOf(CONTAINER_ID.incrementAndGet()), "123",
+    ContainerTokenIdentifier tokenId = new ContainerTokenIdentifier("any user",
+        ContainerID.valueOf(CONTAINER_ID.incrementAndGet()),
         Instant.now().plusSeconds(3600));
+    tokenId.setSecretKeyId(SECRET_KEY_ID);
+    return tokenId;
   }
 }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java
index dd8c277858..fbedf2de1a 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenIdentifier.java
@@ -17,157 +17,76 @@
  */
 package org.apache.hadoop.hdds.security.token;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.security.InvalidKeyException;
-import java.security.KeyPair;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.security.PrivateKey;
-import java.security.Signature;
-import java.security.SignatureException;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateEncodingException;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import javax.crypto.SecretKey;
-
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyTestUtil;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.token.Token;
-import org.apache.ozone.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+
+import static java.time.Duration.ofDays;
+import static java.time.Instant.now;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test class for {@link OzoneBlockTokenIdentifier}.
  */
 public class TestOzoneBlockTokenIdentifier {
+  private long expiryTime;
+  private ManagedSecretKey secretKey;
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestOzoneBlockTokenIdentifier.class);
-  private static final String BASEDIR = GenericTestUtils
-      .getTempPath(TestOzoneBlockTokenIdentifier.class.getSimpleName());
-  private static final String KEYSTORES_DIR =
-      new File(BASEDIR).getAbsolutePath();
-  private static long expiryTime;
-  private static KeyPair keyPair;
-  private static X509Certificate cert;
-
-  @BeforeAll
-  public static void setUp() throws Exception {
-    File base = new File(BASEDIR);
-    FileUtil.fullyDelete(base);
-    base.mkdirs();
+  @BeforeEach
+  public void setUp() throws Exception {
     expiryTime = Time.monotonicNow() + 60 * 60 * 24;
 
-    // Create Ozone Master key pair.
-    keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
-    // Create Ozone Master certificate (SCM CA issued cert) and key store.
-    cert = KeyStoreTestUtil
-        .generateCertificate("CN=OzoneMaster", keyPair, 30, "SHA256withRSA");
-  }
-
-  @AfterEach
-  public void cleanUp() throws Exception {
-    // KeyStoreTestUtil.cleanupSSLConfig(KEYSTORES_DIR, sslConfsDir);
+    secretKey = SecretKeyTestUtil.generateHmac(now(), ofDays(1));
   }
 
   @Test
-  public void testSignToken() throws GeneralSecurityException, IOException {
-    String keystore = new File(KEYSTORES_DIR, "keystore.jks")
-        .getAbsolutePath();
-    String truststore = new File(KEYSTORES_DIR, "truststore.jks")
-        .getAbsolutePath();
-    String trustPassword = "trustPass";
-    String keyStorePassword = "keyStorePass";
-    String keyPassword = "keyPass";
-
-
-    KeyStoreTestUtil.createKeyStore(keystore, keyStorePassword, keyPassword,
-        "OzoneMaster", keyPair.getPrivate(), cert);
-
-    // Create trust store and put the certificate in the trust store
-    Map<String, X509Certificate> certs = Collections.singletonMap("server",
-        cert);
-    KeyStoreTestUtil.createTrustStore(truststore, trustPassword, certs);
-
-    // Sign the OzoneMaster Token with Ozone Master private key
-    PrivateKey privateKey = keyPair.getPrivate();
+  public void testSignToken() {
     OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
         "testUser", "84940",
-        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString(), 128L);
-    byte[] signedToken = signTokenAsymmetric(tokenId, privateKey);
-
-    // Verify a valid signed OzoneMaster Token with Ozone Master
-    // public key(certificate)
-    boolean isValidToken = verifyTokenAsymmetric(tokenId, signedToken, cert);
-    LOG.info("{} is {}", tokenId, isValidToken ? "valid." : "invalid.");
+        EnumSet.allOf(AccessModeProto.class),
+        expiryTime, 128L);
+    tokenId.setSecretKeyId(secretKey.getId());
+    byte[] signedToken = secretKey.sign(tokenId);
 
-    // Verify an invalid signed OzoneMaster Token with Ozone Master
-    // public key(certificate)
-    tokenId = new OzoneBlockTokenIdentifier("", "",
-        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString(), 128L);
-    LOG.info("Unsigned token {} is {}", tokenId,
-        verifyTokenAsymmetric(tokenId, RandomUtils.nextBytes(128), cert));
+    // Verify a valid signed OzoneMaster Token with Ozone Master.
+    assertTrue(secretKey.isValidSignature(tokenId.getBytes(), signedToken));
 
+    // Verify an invalid signed OzoneMaster Token with Ozone Master.
+    assertFalse(secretKey.isValidSignature(tokenId.getBytes(),
+        RandomUtils.nextBytes(128)));
   }
 
   @Test
-  public void testTokenSerialization() throws GeneralSecurityException,
+  public void testTokenSerialization() throws
       IOException {
-    String keystore = new File(KEYSTORES_DIR, "keystore.jks")
-        .getAbsolutePath();
-    String truststore = new File(KEYSTORES_DIR, "truststore.jks")
-        .getAbsolutePath();
-    String trustPassword = "trustPass";
-    String keyStorePassword = "keyStorePass";
-    String keyPassword = "keyPass";
     long maxLength = 128L;
 
-    KeyStoreTestUtil.createKeyStore(keystore, keyStorePassword, keyPassword,
-        "OzoneMaster", keyPair.getPrivate(), cert);
-
-    // Create trust store and put the certificate in the trust store
-    Map<String, X509Certificate> certs = Collections.singletonMap("server",
-        cert);
-    KeyStoreTestUtil.createTrustStore(truststore, trustPassword, certs);
-
-    // Sign the OzoneMaster Token with Ozone Master private key
-    PrivateKey privateKey = keyPair.getPrivate();
     OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
         "testUser", "84940",
-        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString(), maxLength);
-    byte[] signedToken = signTokenAsymmetric(tokenId, privateKey);
-
+        EnumSet.allOf(AccessModeProto.class),
+        expiryTime, maxLength);
+    tokenId.setSecretKeyId(secretKey.getId());
+    byte[] signedToken = secretKey.sign(tokenId);
 
-    Token<OzoneBlockTokenIdentifier> token = new Token(tokenId.getBytes(),
+    Token<OzoneBlockTokenIdentifier> token = new Token<>(tokenId.getBytes(),
         signedToken, tokenId.getKind(), new Text("host:port"));
 
     String encodeToUrlString = token.encodeToUrlString();
 
-    Token<OzoneBlockTokenIdentifier>decodedToken = new Token();
+    Token<OzoneBlockTokenIdentifier>decodedToken = new Token<>();
     decodedToken.decodeFromUrlString(encodeToUrlString);
 
     OzoneBlockTokenIdentifier decodedTokenId = new OzoneBlockTokenIdentifier();
@@ -177,136 +96,8 @@ public class TestOzoneBlockTokenIdentifier {
     Assertions.assertEquals(tokenId, decodedTokenId);
     Assertions.assertEquals(maxLength, decodedTokenId.getMaxLength());
 
-    // Verify a decoded signed Token with public key(certificate)
-    boolean isValidToken = verifyTokenAsymmetric(decodedTokenId, decodedToken
-        .getPassword(), cert);
-    LOG.info("{} is {}", tokenId, isValidToken ? "valid." : "invalid.");
-  }
-
-
-  public byte[] signTokenAsymmetric(OzoneBlockTokenIdentifier tokenId,
-      PrivateKey privateKey) throws NoSuchAlgorithmException,
-      InvalidKeyException, SignatureException {
-    Signature rsaSignature = Signature.getInstance("SHA256withRSA");
-    rsaSignature.initSign(privateKey);
-    rsaSignature.update(tokenId.getBytes());
-    byte[] signature = rsaSignature.sign();
-    return signature;
-  }
-
-  public boolean verifyTokenAsymmetric(OzoneBlockTokenIdentifier tokenId,
-      byte[] signature, Certificate certificate) throws InvalidKeyException,
-      NoSuchAlgorithmException, SignatureException {
-    Signature rsaSignature = Signature.getInstance("SHA256withRSA");
-    rsaSignature.initVerify(certificate);
-    rsaSignature.update(tokenId.getBytes());
-    boolean isValid = rsaSignature.verify(signature);
-    return isValid;
-  }
-
-  private byte[] signTokenSymmetric(OzoneBlockTokenIdentifier identifier,
-      Mac mac, SecretKey key) {
-    try {
-      mac.init(key);
-    } catch (InvalidKeyException ike) {
-      throw new IllegalArgumentException("Invalid key to HMAC computation",
-          ike);
-    }
-    return mac.doFinal(identifier.getBytes());
-  }
-
-  OzoneBlockTokenIdentifier generateTestToken() {
-    return new OzoneBlockTokenIdentifier(RandomStringUtils.randomAlphabetic(6),
-        RandomStringUtils.randomAlphabetic(5),
-        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString(), 1024768L);
-  }
-
-  @Test
-  public void testAsymmetricTokenPerf() throws NoSuchAlgorithmException,
-      CertificateEncodingException, NoSuchProviderException,
-      InvalidKeyException, SignatureException {
-    final int testTokenCount = 1000;
-    List<OzoneBlockTokenIdentifier> tokenIds = new ArrayList<>();
-    List<byte[]> tokenPasswordAsym = new ArrayList<>();
-    for (int i = 0; i < testTokenCount; i++) {
-      tokenIds.add(generateTestToken());
-    }
-
-    KeyPair kp = KeyStoreTestUtil.generateKeyPair("RSA");
-
-    // Create Ozone Master certificate (SCM CA issued cert) and key store
-    X509Certificate certificate;
-    certificate = KeyStoreTestUtil.generateCertificate("CN=OzoneMaster",
-        kp, 30, "SHA256withRSA");
-
-    long startTime = Time.monotonicNowNanos();
-    for (int i = 0; i < testTokenCount; i++) {
-      tokenPasswordAsym.add(
-          signTokenAsymmetric(tokenIds.get(i), kp.getPrivate()));
-    }
-    long duration = Time.monotonicNowNanos() - startTime;
-    LOG.info("Average token sign time with HmacSha256(RSA/1024 key) is {} ns",
-        duration / testTokenCount);
-
-    startTime = Time.monotonicNowNanos();
-    for (int i = 0; i < testTokenCount; i++) {
-      verifyTokenAsymmetric(tokenIds.get(i), tokenPasswordAsym.get(i),
-          certificate);
-    }
-    duration = Time.monotonicNowNanos() - startTime;
-    LOG.info("Average token verify time with HmacSha256(RSA/1024 key) "
-        + "is {} ns", duration / testTokenCount);
-  }
-
-  @Test
-  public void testSymmetricTokenPerf() {
-    String hmacSHA1 = "HmacSHA1";
-    String hmacSHA256 = "HmacSHA256";
-
-    testSymmetricTokenPerfHelper(hmacSHA1, 64);
-    testSymmetricTokenPerfHelper(hmacSHA256, 1024);
-  }
-
-  public void testSymmetricTokenPerfHelper(String hmacAlgorithm, int keyLen) {
-    final int testTokenCount = 1000;
-    List<OzoneBlockTokenIdentifier> tokenIds = new ArrayList<>();
-    List<byte[]> tokenPasswordSym = new ArrayList<>();
-    for (int i = 0; i < testTokenCount; i++) {
-      tokenIds.add(generateTestToken());
-    }
-
-    KeyGenerator keyGen;
-    try {
-      keyGen = KeyGenerator.getInstance(hmacAlgorithm);
-      keyGen.init(keyLen);
-    } catch (NoSuchAlgorithmException nsa) {
-      throw new IllegalArgumentException("Can't find " + hmacAlgorithm +
-          " algorithm.");
-    }
-
-    Mac mac;
-    try {
-      mac = Mac.getInstance(hmacAlgorithm);
-    } catch (NoSuchAlgorithmException nsa) {
-      throw new IllegalArgumentException("Can't find " + hmacAlgorithm +
-          " algorithm.");
-    }
-
-    SecretKey secretKey = keyGen.generateKey();
-
-    long startTime = Time.monotonicNowNanos();
-    for (int i = 0; i < testTokenCount; i++) {
-      tokenPasswordSym.add(
-          signTokenSymmetric(tokenIds.get(i), mac, secretKey));
-    }
-    long duration = Time.monotonicNowNanos() - startTime;
-    LOG.info("Average token sign time with {}({} symmetric key) is {} ns",
-        hmacAlgorithm, keyLen, duration / testTokenCount);
-  }
-
-  // TODO: verify certificate with a trust store
-  public boolean verifyCert(Certificate certificate) {
-    return true;
+    // Verify a decoded signed Token
+    assertTrue(secretKey.isValidSignature(decodedTokenId,
+        decodedToken.getPassword()));
   }
 }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenSecretManager.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenSecretManager.java
index 72aacb4a90..d2c1a6d326 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenSecretManager.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TestOzoneBlockTokenSecretManager.java
@@ -18,19 +18,6 @@
 
 package org.apache.hadoop.hdds.security.token;
 
-import static org.apache.hadoop.ozone.container.ContainerTestHelper.getBlockRequest;
-import static org.apache.hadoop.ozone.container.ContainerTestHelper.getReadChunkRequest;
-import static org.apache.hadoop.ozone.container.ContainerTestHelper.newPutBlockRequestBuilder;
-import static org.apache.hadoop.ozone.container.ContainerTestHelper.newWriteChunkRequestBuilder;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.AdditionalAnswers.delegatesTo;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -38,48 +25,37 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyTestUtil;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
-import org.apache.hadoop.hdds.security.x509.certificate.client.DefaultCertificateClient;
-import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.token.Token;
 import org.apache.ozone.test.GenericTestUtils;
-import org.apache.ozone.test.LambdaTestUtils;
-import org.apache.hadoop.util.Time;
-import org.bouncycastle.asn1.x500.X500Name;
-import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
-import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
-import org.bouncycastle.cert.X509CertificateHolder;
-import org.bouncycastle.cert.X509v1CertificateBuilder;
-import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
-import org.bouncycastle.crypto.util.PrivateKeyFactory;
-import org.bouncycastle.operator.ContentSigner;
-import org.bouncycastle.operator.DefaultDigestAlgorithmIdentifierFinder;
-import org.bouncycastle.operator.DefaultSignatureAlgorithmIdentifierFinder;
-import org.bouncycastle.operator.OperatorCreationException;
-import org.bouncycastle.operator.bc.BcRSAContentSignerBuilder;
-import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.slf4j.Logger;
+import org.mockito.Mockito;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
-import java.io.IOException;
-import java.math.BigInteger;
-import java.nio.file.Path;
-import java.security.KeyPair;
-import java.security.SecureRandom;
-import java.security.Signature;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.time.Instant;
-import java.util.Date;
+import java.security.NoSuchAlgorithmException;
 import java.util.EnumSet;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import static java.time.Duration.ofDays;
+import static java.time.Instant.now;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getBlockRequest;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getReadChunkRequest;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.newPutBlockRequestBuilder;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.newWriteChunkRequestBuilder;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
 /**
  * Test class for {@link OzoneBlockTokenSecretManager}.
  */
@@ -87,14 +63,15 @@ public class TestOzoneBlockTokenSecretManager {
 
   private static final String BASEDIR = GenericTestUtils
       .getTempPath(TestOzoneBlockTokenSecretManager.class.getSimpleName());
-  private static final String ALGORITHM = "SHA256withRSA";
+  private static final String ALGORITHM = "HmacSHA256";
 
   private OzoneBlockTokenSecretManager secretManager;
-  private KeyPair keyPair;
-  private String omCertSerialId;
-  private CertificateClient client;
+  private UUID secretKeyId;
+  private SecretKeyVerifierClient secretKeyClient;
+  private SecretKeySignerClient secretKeySignerClient;
   private TokenVerifier tokenVerifier;
   private Pipeline pipeline;
+  private ManagedSecretKey secretKey;
 
   @Before
   public void setUp() throws Exception {
@@ -103,34 +80,19 @@ public class TestOzoneBlockTokenSecretManager {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, BASEDIR);
     conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED, true);
-    // Create Ozone Master key pair.
-    keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
-    // Create Ozone Master certificate (SCM CA issued cert) and key store.
     SecurityConfig securityConfig = new SecurityConfig(conf);
-    X509Certificate x509Certificate = KeyStoreTestUtil
-        .generateCertificate("CN=OzoneMaster", keyPair, 30, ALGORITHM);
-    omCertSerialId = x509Certificate.getSerialNumber().toString();
-    secretManager = new OzoneBlockTokenSecretManager(securityConfig,
-        TimeUnit.HOURS.toMillis(1));
-    Logger log = mock(Logger.class);
-    DefaultCertificateClient toStub =
-        new DefaultCertificateClient(
-            securityConfig, log, null, "test", null, null) {
-          @Override
-          protected String signAndStoreCertificate(
-              PKCS10CertificationRequest request, Path certificatePath) {
-            return null;
-          }
-        };
-    client = mock(DefaultCertificateClient.class, delegatesTo(toStub));
-    doReturn(x509Certificate).when(client).getCertificate();
-    doReturn(x509Certificate).when(client).getCertificate(anyString());
-    doReturn(keyPair.getPublic()).when(client).getPublicKey();
-    doReturn(keyPair.getPrivate()).when(client).getPrivateKey();
-    doReturn(null).when(client).signData(any(byte[].class));
-
-    secretManager.start(client);
-    tokenVerifier = new BlockTokenVerifier(securityConfig, client);
+
+    secretKey = generateValidSecretKey();
+    secretKeyId = secretKey.getId();
+
+    secretKeyClient = Mockito.mock(SecretKeyVerifierClient.class);
+    secretKeySignerClient = Mockito.mock(SecretKeySignerClient.class);
+    when(secretKeySignerClient.getCurrentSecretKey()).thenReturn(secretKey);
+    when(secretKeyClient.getSecretKey(secretKeyId)).thenReturn(secretKey);
+
+    secretManager = new OzoneBlockTokenSecretManager(
+        TimeUnit.HOURS.toMillis(1), secretKeySignerClient);
+    tokenVerifier = new BlockTokenVerifier(securityConfig, secretKeyClient);
   }
 
   @After
@@ -152,7 +114,7 @@ public class TestOzoneBlockTokenSecretManager {
         identifier.getService());
     Assert.assertEquals(EnumSet.allOf(AccessModeProto.class),
         identifier.getAccessModes());
-    Assert.assertEquals(omCertSerialId, identifier.getCertSerialId());
+    Assert.assertEquals(secretKeyId, identifier.getSecretKeyId());
 
     validateHash(token.getPassword(), token.getIdentifier());
   }
@@ -169,9 +131,8 @@ public class TestOzoneBlockTokenSecretManager {
         btIdentifier.getService());
     Assert.assertEquals(EnumSet.allOf(AccessModeProto.class),
         btIdentifier.getAccessModes());
-    Assert.assertEquals(omCertSerialId, btIdentifier.getCertSerialId());
-
     byte[] hash = secretManager.createPassword(btIdentifier);
+    Assert.assertEquals(secretKeyId, btIdentifier.getSecretKeyId());
     validateHash(hash, btIdentifier.getBytes());
   }
 
@@ -223,57 +184,8 @@ public class TestOzoneBlockTokenSecretManager {
         OzoneBlockTokenIdentifier.getTokenService(otherBlockID)));
   }
 
-  /**
-   * Validate hash using public key of KeyPair.
-   * */
   private void validateHash(byte[] hash, byte[] identifier) throws Exception {
-    Signature rsaSignature =
-        Signature.getInstance(secretManager.getDefaultSignatureAlgorithm());
-    rsaSignature.initVerify(client.getPublicKey());
-    rsaSignature.update(identifier);
-    assertTrue(rsaSignature.verify(hash));
-  }
-
-  @Test
-  @SuppressWarnings("java:S2699")
-  public void testCreateIdentifierFailure() throws Exception {
-    LambdaTestUtils.intercept(SecurityException.class,
-        "Ozone block token can't be created without owner and access mode "
-            + "information.", () -> {
-          secretManager.createIdentifier();
-        });
-  }
-
-  @Test
-  @SuppressWarnings("java:S2699")
-  public void testRenewToken() throws Exception {
-    LambdaTestUtils.intercept(UnsupportedOperationException.class,
-        "Renew token operation is not supported for ozone block" +
-            " tokens.", () -> {
-          secretManager.renewToken(null, null);
-        });
-  }
-
-  @Test
-  @SuppressWarnings("java:S2699")
-  public void testCancelToken() throws Exception {
-    LambdaTestUtils.intercept(UnsupportedOperationException.class,
-        "Cancel token operation is not supported for ozone block" +
-            " tokens.", () -> {
-          secretManager.cancelToken(null, null);
-        });
-  }
-
-  @Test
-  @SuppressWarnings("java:S2699")
-  public void testVerifySignatureFailure() throws Exception {
-    OzoneBlockTokenIdentifier id = new OzoneBlockTokenIdentifier(
-        "testUser", "123", EnumSet.allOf(AccessModeProto.class),
-        Time.now() + 60 * 60 * 24, "123444", 1024);
-    LambdaTestUtils.intercept(UnsupportedOperationException.class, "operation" +
-            " is not supported for block tokens",
-        () -> secretManager.verifySignature(id,
-            client.signData(id.getBytes())));
+    assertTrue(secretKey.isValidSignature(identifier, hash));
   }
 
   @Test
@@ -327,7 +239,7 @@ public class TestOzoneBlockTokenSecretManager {
   }
 
   @Test
-  public void testExpiredCertificate() throws Exception {
+  public void testExpiredSecretKey() throws Exception {
     String user = "testUser2";
     BlockID blockID = new BlockID(102, 0);
     Token<OzoneBlockTokenIdentifier> token =
@@ -341,79 +253,23 @@ public class TestOzoneBlockTokenSecretManager {
     tokenVerifier.verify("testUser", token, writeChunkRequest);
 
     // Mock client with an expired cert
-    X509Certificate expiredCert = generateExpiredCert(
-        "CN=OzoneMaster", keyPair, ALGORITHM);
-    when(client.getCertificate(anyString())).thenReturn(expiredCert);
+    ManagedSecretKey expiredSecretKey = generateExpiredSecretKey();
+    when(secretKeyClient.getSecretKey(any())).thenReturn(expiredSecretKey);
 
     BlockTokenException e = assertThrows(BlockTokenException.class,
         () -> tokenVerifier.verify(user, token, writeChunkRequest));
     String msg = e.getMessage();
     assertTrue(msg, msg.contains("Token can't be verified due to" +
-        " expired certificate"));
-  }
-
-  @Test
-  public void testNetYetValidCertificate() throws Exception {
-    String user = "testUser2";
-    BlockID blockID = new BlockID(102, 0);
-    Token<OzoneBlockTokenIdentifier> token =
-        secretManager.generateToken(user, blockID,
-            EnumSet.allOf(AccessModeProto.class), 100);
-    ContainerCommandRequestProto writeChunkRequest =
-        newWriteChunkRequestBuilder(pipeline, blockID, 100)
-        .setEncodedToken(token.encodeToUrlString())
-        .build();
-
-    tokenVerifier.verify(user, token, writeChunkRequest);
-
-    // Mock client with an expired cert
-    X509Certificate netYetValidCert = generateNotValidYetCert(
-        "CN=OzoneMaster", keyPair, ALGORITHM);
-    when(client.getCertificate(anyString())).
-        thenReturn(netYetValidCert);
-
-    BlockTokenException e = assertThrows(BlockTokenException.class,
-        () -> tokenVerifier.verify(user, token, writeChunkRequest));
-    String msg = e.getMessage();
-    assertTrue(msg, msg.contains("Token can't be verified due to not" +
-        " yet valid certificate"));
-  }
-
-  private X509Certificate generateExpiredCert(String dn,
-      KeyPair pair, String algorithm) throws CertificateException,
-      IllegalStateException, IOException, OperatorCreationException {
-    Date from = new Date();
-    // Set end date same as start date to make sure the cert is expired.
-    return generateTestCert(dn, pair, algorithm, from, from);
+        " expired secret key"));
   }
 
-  private X509Certificate generateNotValidYetCert(String dn,
-      KeyPair pair, String algorithm) throws CertificateException,
-      IllegalStateException, IOException, OperatorCreationException {
-    Date from = new Date(Instant.now().toEpochMilli() + 100000L);
-    Date to = new Date(from.getTime() + 200000L);
-    return generateTestCert(dn, pair, algorithm, from, to);
+  private ManagedSecretKey generateValidSecretKey()
+      throws NoSuchAlgorithmException {
+    return SecretKeyTestUtil.generateKey(ALGORITHM, now(), ofDays(1));
   }
 
-  private X509Certificate generateTestCert(String dn,
-      KeyPair pair, String algorithm, Date from, Date to)
-      throws CertificateException, IllegalStateException,
-      IOException, OperatorCreationException {
-    BigInteger sn = new BigInteger(64, new SecureRandom());
-    SubjectPublicKeyInfo subPubKeyInfo = SubjectPublicKeyInfo.getInstance(
-        pair.getPublic().getEncoded());
-    X500Name subjectDN = new X500Name(dn);
-    X509v1CertificateBuilder builder = new X509v1CertificateBuilder(
-        subjectDN, sn, from, to, subjectDN, subPubKeyInfo);
-
-    AlgorithmIdentifier sigAlgId =
-        new DefaultSignatureAlgorithmIdentifierFinder().find(algorithm);
-    AlgorithmIdentifier digAlgId =
-        new DefaultDigestAlgorithmIdentifierFinder().find(sigAlgId);
-    ContentSigner signer =
-        new BcRSAContentSignerBuilder(sigAlgId, digAlgId)
-            .build(PrivateKeyFactory.createKey(pair.getPrivate().getEncoded()));
-    X509CertificateHolder holder = builder.build(signer);
-    return new JcaX509CertificateConverter().getCertificate(holder);
+  private ManagedSecretKey generateExpiredSecretKey() throws Exception {
+    return SecretKeyTestUtil.generateKey(ALGORITHM,
+        now().minus(ofDays(2)), ofDays(1));
   }
 }
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TokenVerifierTests.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TokenVerifierTests.java
index 009b7a0105..1c7085b35b 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TokenVerifierTests.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/token/TokenVerifierTests.java
@@ -19,25 +19,30 @@ package org.apache.hadoop.hdds.security.token;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyVerifierClient;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.jetbrains.annotations.NotNull;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.crypto.SecretKey;
 import java.io.IOException;
-import java.security.cert.CertificateException;
-import java.security.cert.CertificateExpiredException;
-import java.security.cert.CertificateNotYetValidException;
-import java.security.cert.X509Certificate;
+import java.time.Duration;
 import java.time.Instant;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -51,13 +56,13 @@ public abstract class TokenVerifierTests<T extends ShortLivedTokenIdentifier> {
   private static final Logger LOG =
       LoggerFactory.getLogger(TokenVerifierTests.class);
 
-  protected static final String CERT_ID = "123";
+  protected static final UUID SECRET_KEY_ID = UUID.randomUUID();
 
   /**
    * Create the specific kind of TokenVerifier.
    */
   protected abstract TokenVerifier newTestSubject(
-      SecurityConfig secConf, CertificateClient caClient);
+      SecurityConfig secConf, SecretKeyVerifierClient secretKeyClient);
 
   /**
    * @return the config key to enable/disable the specific kind of tokens
@@ -82,98 +87,138 @@ public abstract class TokenVerifierTests<T extends ShortLivedTokenIdentifier> {
   @Test
   public void skipsVerificationIfDisabled() throws IOException {
     // GIVEN
-    CertificateClient caClient = mock(CertificateClient.class);
-    TokenVerifier subject = newTestSubject(tokenDisabled(), caClient);
+    SecretKeyVerifierClient secretKeyClient = mock(
+        SecretKeyVerifierClient.class);
+    TokenVerifier subject = newTestSubject(tokenDisabled(), secretKeyClient);
 
     // WHEN
     subject.verify("anyUser", anyToken(), verifiedRequest(newTokenId()));
 
     // THEN
-    verify(caClient, never()).getCertificate(any());
+    verify(secretKeyClient, never()).getSecretKey(any());
   }
 
   @Test
   public void skipsVerificationForMiscCommands() throws IOException {
     // GIVEN
-    CertificateClient caClient = mock(CertificateClient.class);
-    TokenVerifier subject = newTestSubject(tokenEnabled(), caClient);
+    SecretKeyVerifierClient secretKeyClient = mock(
+        SecretKeyVerifierClient.class);
+    TokenVerifier subject = newTestSubject(tokenEnabled(), secretKeyClient);
 
     // WHEN
     subject.verify("anyUser", anyToken(), unverifiedRequest());
 
     // THEN
-    verify(caClient, never()).getCertificate(any());
+    verify(secretKeyClient, never()).getSecretKey(any());
   }
 
   @Test
-  public void rejectsExpiredCertificate() throws Exception {
-    rejectsInvalidCertificate(CertificateExpiredException.class);
-  }
+  public void rejectsExpiredSecretKey() throws Exception {
+    // GIVEN
+    SecretKeyVerifierClient secretKeyClient =
+        mock(SecretKeyVerifierClient.class);
 
-  @Test
-  public void rejectsNotYetValidCertificate() throws Exception {
-    rejectsInvalidCertificate(CertificateNotYetValidException.class);
+    Instant past = Instant.now().minus(Duration.ofHours(1));
+    ManagedSecretKey expiredSecretKey = new ManagedSecretKey(UUID.randomUUID(),
+        past, past, Mockito.mock(SecretKey.class));
+
+    when(secretKeyClient.getSecretKey(SECRET_KEY_ID))
+        .thenReturn(expiredSecretKey);
+    T tokenId = newTokenId();
+    ContainerCommandRequestProto cmd = verifiedRequest(tokenId);
+    TokenVerifier subject = newTestSubject(tokenEnabled(), secretKeyClient);
+
+    // WHEN+THEN
+    ShortLivedTokenSecretManager<T> secretManager = new MockTokenManager();
+    Token<T> token = secretManager.generateToken(tokenId);
+    BlockTokenException ex = assertThrows(BlockTokenException.class, () ->
+        subject.verify("anyUser", token, cmd));
+    assertThat(ex.getMessage(), containsString("expired secret key"));
   }
 
-  private void rejectsInvalidCertificate(
-      Class<? extends CertificateException> problem) throws Exception {
+  @Test
+  public void rejectsTokenWithInvalidSecretId() throws Exception {
     // GIVEN
-    CertificateClient caClient = mock(CertificateClient.class);
-    X509Certificate cert = invalidCertificate(problem);
-    when(caClient.getCertificate(CERT_ID)).thenReturn(cert);
-    ContainerCommandRequestProto cmd = verifiedRequest(newTokenId());
-    TokenVerifier subject = newTestSubject(tokenEnabled(), caClient);
+    SecretKeyVerifierClient secretKeyClient =
+        mock(SecretKeyVerifierClient.class);
+
+    when(secretKeyClient.getSecretKey(SECRET_KEY_ID)).thenReturn(null);
+    T tokenId = newTokenId();
+    ContainerCommandRequestProto cmd = verifiedRequest(tokenId);
+    TokenVerifier subject = newTestSubject(tokenEnabled(), secretKeyClient);
 
     // WHEN+THEN
-    assertThrows(BlockTokenException.class, () ->
-        subject.verify("anyUser", anyToken(), cmd));
+    ShortLivedTokenSecretManager<T> secretManager = new MockTokenManager();
+    Token<T> token = secretManager.generateToken(tokenId);
+    BlockTokenException ex = assertThrows(BlockTokenException.class, () ->
+        subject.verify("anyUser", token, cmd));
+    assertThat(ex.getMessage(),
+        containsString("Can't find the signer secret key"));
   }
 
   @Test
   public void rejectsInvalidSignature() throws Exception {
     // GIVEN
-    CertificateClient caClient = mock(CertificateClient.class);
-    when(caClient.getCertificate(CERT_ID)).thenReturn(validCertificate());
-    Token<?> invalidToken = new Token<>();
-    validSignature(caClient, false);
-    ContainerCommandRequestProto cmd = verifiedRequest(newTokenId());
-    TokenVerifier subject = newTestSubject(tokenEnabled(), caClient);
+    SecretKeyVerifierClient secretKeyClient =
+        mockSecretKeyClient(false);
+
+    ShortLivedTokenSecretManager<T> secretManager = new MockTokenManager();
+    T tokenId = newTokenId();
+    Token<?> invalidToken = secretManager.generateToken(tokenId);
+    ContainerCommandRequestProto cmd = verifiedRequest(tokenId);
+    TokenVerifier subject = newTestSubject(tokenEnabled(), secretKeyClient);
 
     // WHEN+THEN
-    assertThrows(BlockTokenException.class, () ->
-        subject.verify("anyUser", invalidToken, cmd));
+    BlockTokenException ex =
+        assertThrows(BlockTokenException.class, () ->
+            subject.verify("anyUser", invalidToken, cmd));
+    assertThat(ex.getMessage(),
+        containsString("Invalid token for user"));
+  }
+
+  @NotNull
+  private SecretKeyVerifierClient mockSecretKeyClient(boolean validSignature)
+      throws IOException {
+    SecretKeyVerifierClient secretKeyClient =
+        mock(SecretKeyVerifierClient.class);
+    ManagedSecretKey validSecretKey = Mockito.mock(ManagedSecretKey.class);
+    when(secretKeyClient.getSecretKey(SECRET_KEY_ID))
+        .thenReturn(validSecretKey);
+    when(validSecretKey.isValidSignature((TokenIdentifier) any(), any()))
+        .thenReturn(validSignature);
+    return secretKeyClient;
   }
 
   @Test
   public void rejectsExpiredToken() throws Exception {
     // GIVEN
-    SecurityConfig conf = tokenEnabled();
-    CertificateClient caClient = mock(CertificateClient.class);
-    when(caClient.getCertificate(CERT_ID)).thenReturn(validCertificate());
-    validSignature(caClient, true);
-    ShortLivedTokenSecretManager<T> secretManager = new MockTokenManager(conf);
+    SecretKeyVerifierClient secretKeyClient = mockSecretKeyClient(true);
+
+    ShortLivedTokenSecretManager<T> secretManager = new MockTokenManager();
     T tokenId = expired(newTokenId());
     ContainerCommandRequestProto cmd = verifiedRequest(tokenId);
     Token<?> token = secretManager.generateToken(tokenId);
-    TokenVerifier subject = newTestSubject(tokenEnabled(), caClient);
+    TokenVerifier subject = newTestSubject(tokenEnabled(), secretKeyClient);
 
     // WHEN+THEN
-    assertThrows(BlockTokenException.class, () ->
-        subject.verify("anyUser", token, cmd));
+    BlockTokenException ex =
+        assertThrows(BlockTokenException.class, () ->
+            subject.verify("anyUser", token, cmd));
+    assertThat(ex.getMessage(),
+        containsString("Expired token for user"));
   }
 
   @Test
   public void acceptsValidToken() throws Exception {
     // GIVEN
     SecurityConfig conf = tokenEnabled();
-    CertificateClient caClient = mock(CertificateClient.class);
-    when(caClient.getCertificate(CERT_ID)).thenReturn(validCertificate());
-    validSignature(caClient, true);
-    ShortLivedTokenSecretManager<T> secretManager = new MockTokenManager(conf);
+    SecretKeyVerifierClient secretKeyClient = mockSecretKeyClient(true);
+
+    ShortLivedTokenSecretManager<T> secretManager = new MockTokenManager();
     T tokenId = valid(newTokenId());
     ContainerCommandRequestProto cmd = verifiedRequest(tokenId);
     Token<?> token = secretManager.generateToken(tokenId);
-    TokenVerifier subject = newTestSubject(conf, caClient);
+    TokenVerifier subject = newTestSubject(conf, secretKeyClient);
 
     // WHEN+THEN
     subject.verify("anyUser", token, cmd);
@@ -189,24 +234,6 @@ public abstract class TokenVerifierTests<T extends ShortLivedTokenIdentifier> {
     return tokenId;
   }
 
-  private void validSignature(CertificateClient caClient, boolean valid)
-      throws Exception {
-    when(caClient.verifySignature(any(byte[].class), any(), any()))
-        .thenReturn(valid);
-  }
-
-  private static X509Certificate invalidCertificate(
-      Class<? extends CertificateException> problem)
-      throws CertificateExpiredException, CertificateNotYetValidException {
-    X509Certificate cert = mock(X509Certificate.class);
-    doThrow(problem).when(cert).checkValidity();
-    return cert;
-  }
-
-  private static X509Certificate validCertificate() {
-    return mock(X509Certificate.class);
-  }
-
   protected SecurityConfig tokenDisabled() {
     return getSecurityConfig(false);
   }
@@ -230,8 +257,9 @@ public abstract class TokenVerifierTests<T extends ShortLivedTokenIdentifier> {
    */
   private class MockTokenManager extends ShortLivedTokenSecretManager<T> {
 
-    MockTokenManager(SecurityConfig conf) {
-      super(conf, TimeUnit.HOURS.toMillis(1), LOG);
+    MockTokenManager() {
+      super(TimeUnit.HOURS.toMillis(1),
+          Mockito.mock(SecretKeySignerClient.class));
     }
 
     @Override
diff --git a/hadoop-hdds/framework/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/hadoop-hdds/framework/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000000..72652da02f
--- /dev/null
+++ b/hadoop-hdds/framework/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+mock-maker-inline
diff --git a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
index d5a3c6f65a..6487e30fce 100644
--- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
+++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto
@@ -529,7 +529,8 @@ message ContainerTokenSecretProto {
     required string ownerId = 1;
     required ContainerID containerId = 2;
     required uint64 expiryDate = 3;
-    required string certSerialId = 4;
+    optional string certSerialId = 4 [deprecated=true];
+    optional UUID secretKeyId = 5;
 }
 
 message GetContainerTokenRequestProto {
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 16ea4887aa..c5d1ae9a3a 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -357,9 +357,10 @@ message BlockTokenSecretProto {
     required string ownerId = 1;
     required string blockId = 2;
     required uint64 expiryDate = 3;
-    required string omCertSerialId = 4;
+    optional string omCertSerialId = 4 [deprecated=true];
     repeated AccessModeProto modes = 5;
     required uint64 maxLength = 6;
+    optional UUID secretKeyId = 7;
 }
 
 message BlockID {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index 07b74cc42d..e3fd7d416b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -179,13 +179,13 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
   @Override
   public ManagedSecretKey getCurrentSecretKey() throws SCMSecurityException {
     validateSecretKeyStatus();
-    return secretKeyManager.getCurrentKey();
+    return secretKeyManager.getCurrentSecretKey();
   }
 
   @Override
   public ManagedSecretKey getSecretKey(UUID id) throws SCMSecurityException {
     validateSecretKeyStatus();
-    return secretKeyManager.getKey(id);
+    return secretKeyManager.getSecretKey(id);
   }
 
   @Override
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index a14cd2cdca..82d912f7d3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -131,7 +131,6 @@ import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.PipelineChoosePolic
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.IncrementalContainerReportFromDatanode;
-import org.apache.hadoop.hdds.security.OzoneSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
@@ -164,7 +163,6 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.Containe
 
 import javax.management.ObjectName;
 import java.io.IOException;
-import java.io.UncheckedIOException;
 import java.math.BigInteger;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -955,8 +953,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       scmCertificateClient = new SCMCertificateClient(securityConfig,
           certSerialNumber, SCM_ROOT_CA_COMPONENT_NAME);
     }
-    return new ContainerTokenSecretManager(securityConfig,
-        expiryTime);
+    return new ContainerTokenSecretManager(expiryTime,
+        secretKeyManagerService.getSecretKeyManager());
   }
 
   /**
@@ -1456,7 +1454,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     }
 
     scmHAManager.start();
-    startSecretManagerIfNecessary();
 
     ms = HddsServerUtil
         .initializeMetrics(configuration, "StorageContainerManager");
@@ -1610,8 +1607,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       LOG.error("SCM block manager service stop failed.", ex);
     }
 
-    stopSecretManager();
-
     if (metrics != null) {
       metrics.unRegister();
     }
@@ -1765,6 +1760,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     return scmSafeModeManager;
   }
 
+  @VisibleForTesting
+  public SecretKeyManager getSecretKeyManager() {
+    return secretKeyManagerService != null ?
+        secretKeyManagerService.getSecretKeyManager() : null;
+  }
+
   @Override
   public ReplicationManager getReplicationManager() {
     return replicationManager;
@@ -1950,46 +1951,6 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     return scmHANodeDetails.getLocalNodeDetails().getNodeId();
   }
 
-  private void startSecretManagerIfNecessary() {
-    boolean shouldRun = securityConfig.isSecurityEnabled()
-        && securityConfig.isContainerTokenEnabled()
-        && containerTokenMgr != null;
-    if (shouldRun) {
-      boolean running = containerTokenMgr.isRunning();
-      if (!running) {
-        startSecretManager();
-      }
-    }
-  }
-
-  private void startSecretManager() {
-    try {
-      scmCertificateClient.assertValidKeysAndCertificate();
-    } catch (OzoneSecurityException e) {
-      LOG.error("Unable to read key pair.", e);
-      throw new UncheckedIOException(e);
-    }
-    try {
-      LOG.info("Starting token manager");
-      containerTokenMgr.start(scmCertificateClient);
-    } catch (IOException e) {
-      // Unable to start secret manager.
-      LOG.error("Error starting block token secret manager.", e);
-      throw new UncheckedIOException(e);
-    }
-  }
-
-  private void stopSecretManager() {
-    if (containerTokenMgr != null) {
-      LOG.info("Stopping block token manager.");
-      try {
-        containerTokenMgr.stop();
-      } catch (IOException e) {
-        LOG.error("Failed to stop block token manager", e);
-      }
-    }
-  }
-
   public ContainerTokenGenerator getContainerTokenGenerator() {
     return containerTokenMgr != null
         ? containerTokenMgr
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 7af924454b..b8eb7b0cb1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -148,7 +148,7 @@ public class TestEndPoint {
       conf.setBoolean(
           OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
       OzoneContainer ozoneContainer = new OzoneContainer(
-          datanodeDetails, conf, getContext(datanodeDetails), null);
+          datanodeDetails, conf, getContext(datanodeDetails));
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
           conf, ozoneContainer);
@@ -182,7 +182,7 @@ public class TestEndPoint {
         serverAddress, 1000)) {
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
       OzoneContainer ozoneContainer = new OzoneContainer(
-          datanodeDetails, conf, getContext(datanodeDetails), null);
+          datanodeDetails, conf, getContext(datanodeDetails));
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
 
       String clusterId = scmServerImpl.getClusterId();
@@ -261,7 +261,7 @@ public class TestEndPoint {
           .captureLogs(VersionEndpointTask.LOG);
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
       OzoneContainer ozoneContainer = new OzoneContainer(
-          datanodeDetails, conf, getContext(datanodeDetails), null);
+          datanodeDetails, conf, getContext(datanodeDetails));
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
           conf, ozoneContainer);
@@ -310,7 +310,7 @@ public class TestEndPoint {
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
       OzoneContainer ozoneContainer = new OzoneContainer(
-          datanodeDetails, conf, getContext(datanodeDetails), null);
+          datanodeDetails, conf, getContext(datanodeDetails));
       VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
           conf, ozoneContainer);
       EndpointStateMachine.EndPointStates newState = versionTask.call();
@@ -338,7 +338,7 @@ public class TestEndPoint {
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       DatanodeDetails datanodeDetails = randomDatanodeDetails();
       OzoneContainer ozoneContainer = new OzoneContainer(
-          datanodeDetails, conf, getContext(datanodeDetails), null);
+          datanodeDetails, conf, getContext(datanodeDetails));
       VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
           conf, ozoneContainer);
 
@@ -576,7 +576,7 @@ public class TestEndPoint {
 
     // Create a datanode state machine for stateConext used by endpoint task
     try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
-        randomDatanodeDetails(), conf, null, null, null);
+        randomDatanodeDetails(), conf);
         EndpointStateMachine rpcEndPoint =
             createEndpoint(conf, scmAddress, rpcTimeout)) {
       HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
index d1a98e890d..e246bd8f46 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/storage/TestContainerCommandsEC.java
@@ -43,10 +43,10 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.WritableECContainerProvider.WritableECContainerProviderConfig;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -57,6 +57,7 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.SecretKeyTestClient;
 import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -105,7 +106,6 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
 import static org.apache.hadoop.ozone.container.ContainerTestHelper.newWriteChunkRequestBuilder;
 
 /**
@@ -144,6 +144,7 @@ public class TestContainerCommandsEC {
   private static Token<ContainerTokenIdentifier> containerToken;
   private static ContainerTokenSecretManager containerTokenGenerator;
   private static OzoneBlockTokenSecretManager blockTokenGenerator;
+  private static SecretKeyClient secretKeyClient;
   private List<XceiverClientSpi> clients = null;
   private static OzoneConfiguration config;
   private static CertificateClient certClient;
@@ -402,7 +403,7 @@ public class TestContainerCommandsEC {
         XceiverClientManager xceiverClientManager =
             new XceiverClientManager(config);
         ECReconstructionCoordinator coordinator =
-            new ECReconstructionCoordinator(config, certClient,
+            new ECReconstructionCoordinator(config, certClient, secretKeyClient,
                  null, ECReconstructionMetrics.create())) {
 
       ECReconstructionMetrics metrics =
@@ -597,7 +598,7 @@ public class TestContainerCommandsEC {
 
     Assert.assertThrows(IOException.class, () -> {
       try (ECReconstructionCoordinator coordinator =
-          new ECReconstructionCoordinator(config, certClient,
+          new ECReconstructionCoordinator(config, certClient,  secretKeyClient,
               null, ECReconstructionMetrics.create())) {
         coordinator.reconstructECContainerGroup(conID,
             (ECReplicationConfig) containerPipeline.getReplicationConfig(),
@@ -659,10 +660,12 @@ public class TestContainerCommandsEC {
 
     OzoneManager.setTestSecureOmFlag(true);
     certClient = new CertificateClientTestImpl(conf);
+    secretKeyClient = new SecretKeyTestClient();
 
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(NUM_DN)
         .setScmId(SCM_ID).setClusterId(CLUSTER_ID)
         .setCertificateClient(certClient)
+        .setSecretKeyClient(secretKeyClient)
         .build();
     cluster.waitForClusterToBeReady();
     cluster.getOzoneManager().startSecretManager();
@@ -706,16 +709,11 @@ public class TestContainerCommandsEC {
     pipeline = pipelines.get(0);
     datanodeDetails = pipeline.getNodes();
 
-    OzoneConfiguration tweakedConfig = new OzoneConfiguration(config);
-    tweakedConfig.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
-    SecurityConfig conf = new SecurityConfig(tweakedConfig);
     long tokenLifetime = TimeUnit.DAYS.toMillis(1);
     containerTokenGenerator = new ContainerTokenSecretManager(
-        conf, tokenLifetime);
-    containerTokenGenerator.start(certClient);
+        tokenLifetime, secretKeyClient);
     blockTokenGenerator = new OzoneBlockTokenSecretManager(
-        conf, tokenLifetime);
-    blockTokenGenerator.start(certClient);
+        tokenLifetime, secretKeyClient);
     containerToken = containerTokenGenerator
         .generateToken(ANY_USER, new ContainerID(containerID));
   }
@@ -732,14 +730,6 @@ public class TestContainerCommandsEC {
     if (cluster != null) {
       cluster.shutdown();
     }
-
-    if (blockTokenGenerator != null) {
-      blockTokenGenerator.stop();
-    }
-
-    if (containerTokenGenerator != null) {
-      containerTokenGenerator.stop();
-    }
   }
 
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index e7a4cf0319..69ad5374c9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -346,6 +347,7 @@ public interface MiniOzoneCluster {
     protected int numDataVolumes = 1;
     protected boolean  startDataNodes = true;
     protected CertificateClient certClient;
+    protected SecretKeyClient secretKeyClient;
     protected int pipelineNumLimit = DEFAULT_PIPELINE_LIMIT;
 
     protected Builder(OzoneConfiguration conf) {
@@ -408,6 +410,11 @@ public interface MiniOzoneCluster {
       return this;
     }
 
+    public Builder setSecretKeyClient(SecretKeyClient client) {
+      this.secretKeyClient = client;
+      return this;
+    }
+
     /**
      * Sets the SCM id.
      *
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index b9e338870f..3e631b9c19 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.client.OzoneClient;
@@ -126,6 +127,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
   // Timeout for the cluster to be ready
   private int waitForClusterToBeReadyTimeout = 120000; // 2 min
   private CertificateClient caClient;
+  private SecretKeyClient secretKeyClient;
 
   /**
    * Creates a new MiniOzoneCluster with Recon.
@@ -483,7 +485,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
   @Override
   public void startHddsDatanodes() {
     hddsDatanodes.forEach((datanode) -> {
-      datanode.setCertificateClient(getCAClient());
+      datanode.setCertificateClient(caClient);
+      datanode.setSecretKeyClient(secretKeyClient);
       datanode.start();
     });
   }
@@ -510,14 +513,14 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
     stopRecon(reconServer);
   }
 
-  private CertificateClient getCAClient() {
-    return this.caClient;
-  }
-
   private void setCAClient(CertificateClient client) {
     this.caClient = client;
   }
 
+  private void setSecretKeyClient(SecretKeyClient client) {
+    this.secretKeyClient = client;
+  }
+
   private static void stopDatanodes(
       Collection<HddsDatanodeService> hddsDatanodes) {
     if (!hddsDatanodes.isEmpty()) {
@@ -591,6 +594,9 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
         if (certClient != null) {
           om.setCertClient(certClient);
         }
+        if (secretKeyClient != null) {
+          om.setSecretKeyClient(secretKeyClient);
+        }
         om.start();
 
         if (includeRecon) {
@@ -607,6 +613,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
             hddsDatanodes, reconServer);
 
         cluster.setCAClient(certClient);
+        cluster.setSecretKeyClient(secretKeyClient);
         if (startDataNodes) {
           cluster.startHddsDatanodes();
         }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index 0fbdb01b41..fb4162e4ef 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -139,7 +139,7 @@ public class TestMiniOzoneCluster {
 
       for (int i = 0; i < 3; i++) {
         stateMachines.add(new DatanodeStateMachine(
-            randomDatanodeDetails(), ozoneConf, null, null, null));
+            randomDatanodeDetails(), ozoneConf));
       }
 
       //we need to start all the servers to get the fix ports
@@ -184,11 +184,11 @@ public class TestMiniOzoneCluster {
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
     try (
         DatanodeStateMachine sm1 = new DatanodeStateMachine(
-            randomDatanodeDetails(), ozoneConf,  null, null, null);
+            randomDatanodeDetails(), ozoneConf);
         DatanodeStateMachine sm2 = new DatanodeStateMachine(
-            randomDatanodeDetails(), ozoneConf,  null, null, null);
+            randomDatanodeDetails(), ozoneConf);
         DatanodeStateMachine sm3 = new DatanodeStateMachine(
-            randomDatanodeDetails(), ozoneConf,  null, null, null);
+            randomDatanodeDetails(), ozoneConf);
     ) {
       HashSet<Integer> ports = new HashSet<Integer>();
       assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
index 9ec8750d78..84423cabac 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
@@ -179,7 +179,6 @@ public final class TestSecretKeysApi {
         ozoneKeytab.getAbsolutePath());
   }
 
-
   /**
    * Test secret key apis in happy case.
    */
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index b2ad993b9f..0e51620360 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -31,16 +31,12 @@ import java.security.cert.X509Certificate;
 import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
-import java.time.ZoneId;
 import java.time.temporal.ChronoUnit;
-import java.util.ArrayList;
 import java.util.Date;
-import java.util.List;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 
-import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.conf.DefaultConfigManager;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -52,30 +48,20 @@ import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMHANodeDetails;
 import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
 import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.proxy.SCMContainerLocationFailoverProxyProvider;
 import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
-import org.apache.hadoop.hdds.security.x509.certificate.authority.CAType;
-import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultApprover;
-import org.apache.hadoop.hdds.security.x509.certificate.authority.profile.DefaultProfile;
-import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
 import org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.authority.DefaultCAServer;
-import org.apache.hadoop.hdds.security.x509.certificate.client.DefaultCertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
-import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.SelfSignedCertificate;
 import org.apache.hadoop.hdds.security.x509.exception.CertificateException;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
@@ -88,8 +74,6 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.common.Storage;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMStorage;
@@ -97,9 +81,6 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
-import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
-import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
-import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransportFactory;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.security.OMCertificateClient;
@@ -118,15 +99,8 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_DEFAULT_DURATION;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_MAX_DURATION;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_DEFAULT_DURATION_DEFAULT;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATION;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_ENABLED;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECURITY_SSL_KEYSTORE_RELOAD_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SECURITY_SSL_TRUSTSTORE_RELOAD_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
@@ -145,7 +119,6 @@ import static org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigString
 import static org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig.ConfigStrings.HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.net.ServerSocketUtil.getPort;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_KERBEROS_KEYTAB_FILE;
@@ -154,7 +127,6 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_F
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_S3_GPRC_SERVER_ENABLED;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_TRANSPORT_CLASS;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_EXPIRED;
 import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
 import org.apache.ratis.protocol.ClientId;
@@ -1189,200 +1161,6 @@ public final class TestSecureOzoneCluster {
     }
   }
 
-  /**
-   * Tests container token renewal after a certificate renew.
-   */
-  @Test
-  public void testContainerTokenRenewCrossCertificateRenew() throws Exception {
-    // Setup secure SCM for start.
-    final int certLifetime = 40 * 1000; // 40s
-    conf.set(HDDS_X509_DEFAULT_DURATION,
-        Duration.ofMillis(certLifetime).toString());
-    conf.set(HDDS_X509_MAX_DURATION,
-        Duration.ofMillis(certLifetime).toString());
-    conf.set(HDDS_X509_RENEW_GRACE_DURATION,
-        Duration.ofMillis(certLifetime - 15 * 1000).toString());
-    conf.setBoolean(HDDS_CONTAINER_TOKEN_ENABLED, true);
-    conf.setLong(HDDS_BLOCK_TOKEN_EXPIRY_TIME, certLifetime - 20 * 1000);
-
-    initSCM();
-    scm = HddsTestUtils.getScmSimple(conf);
-    try {
-      CertificateClientTestImpl certClient =
-          new CertificateClientTestImpl(conf, true);
-      X509Certificate scmCert = certClient.getCertificate();
-      String scmCertId1 = scmCert.getSerialNumber().toString();
-      // Start SCM
-      scm.setScmCertificateClient(certClient);
-      scm.start();
-
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      // Get SCM client which will authenticate via Kerberos
-      SCMContainerLocationFailoverProxyProvider proxyProvider =
-          new SCMContainerLocationFailoverProxyProvider(conf, ugi);
-      StorageContainerLocationProtocolClientSideTranslatorPB scmClient =
-          new StorageContainerLocationProtocolClientSideTranslatorPB(
-              proxyProvider);
-
-      // Since client is already connected get a delegation token
-      ContainerID containerID = new ContainerID(1);
-      Token<?> token1 = scmClient.getContainerToken(containerID);
-
-      // Check if token is of right kind and renewer is running instance
-      assertNotNull(token1);
-      assertEquals(ContainerTokenIdentifier.KIND, token1.getKind());
-      assertEquals(containerID.toString(), token1.getService().toString());
-      ContainerTokenIdentifier temp = new ContainerTokenIdentifier();
-      ByteArrayInputStream buf = new ByteArrayInputStream(
-          token1.getIdentifier());
-      DataInputStream in = new DataInputStream(buf);
-      temp.readFields(in);
-      assertEquals(scmCertId1, temp.getCertSerialId());
-
-      // Wait for SCM certificate to renew
-      GenericTestUtils.waitFor(() -> !scmCertId1.equals(
-              certClient.getCertificate().getSerialNumber().toString()),
-          100, certLifetime);
-      String scmCertId2 =
-          certClient.getCertificate().getSerialNumber().toString();
-      assertNotEquals(scmCertId1, scmCertId2);
-
-      // Get a new container token
-      containerID = new ContainerID(2);
-      Token<?> token2 = scmClient.getContainerToken(containerID);
-      buf = new ByteArrayInputStream(token2.getIdentifier());
-      in = new DataInputStream(buf);
-      temp.readFields(in);
-      assertEquals(scmCertId2, temp.getCertSerialId());
-    } finally {
-      if (scm != null) {
-        scm.stop();
-      }
-    }
-  }
-
-  /**
-   * Test functionality to get SCM signed certificate for OM.
-   */
-  @Test
-  public void testOMGrpcServerCertificateRenew() throws Exception {
-    initSCM();
-    try {
-      scm = HddsTestUtils.getScmSimple(conf);
-      scm.start();
-
-      conf.set(OZONE_METADATA_DIRS, omMetaDirPath.toString());
-      int certLifetime = 30; // second
-      conf.set(HDDS_X509_DEFAULT_DURATION,
-          Duration.ofSeconds(certLifetime).toString());
-      conf.set(HDDS_SECURITY_SSL_KEYSTORE_RELOAD_INTERVAL, "1s");
-      conf.set(HDDS_SECURITY_SSL_TRUSTSTORE_RELOAD_INTERVAL, "1s");
-      conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 2);
-
-      // initialize OmStorage, save om Cert and CA Certs to disk
-      OMStorage omStore = new OMStorage(conf);
-      omStore.setClusterId(clusterId);
-      omStore.setOmId(omId);
-
-      // Prepare the certificates for OM before OM start
-      SecurityConfig securityConfig = new SecurityConfig(conf);
-      CertificateClient scmCertClient = scm.getScmCertificateClient();
-      CertificateCodec certCodec = new CertificateCodec(securityConfig, "om");
-      X509Certificate scmCert = scmCertClient.getCertificate();
-      X509Certificate rootCert = scmCertClient.getCACertificate();
-      X509CertificateHolder certHolder = generateX509CertHolder(conf, keyPair,
-          new KeyPair(scmCertClient.getPublicKey(),
-              scmCertClient.getPrivateKey()), scmCert,
-          Duration.ofSeconds(certLifetime),
-          InetAddress.getLocalHost().getCanonicalHostName(), clusterId);
-      String certId = certHolder.getSerialNumber().toString();
-      certCodec.writeCertificate(certHolder);
-      certCodec.writeCertificate(CertificateCodec.getCertificateHolder(scmCert),
-          String.format(DefaultCertificateClient.CERT_FILE_NAME_FORMAT,
-          CAType.SUBORDINATE.getFileNamePrefix() +
-              scmCert.getSerialNumber().toString()));
-      certCodec.writeCertificate(CertificateCodec.getCertificateHolder(
-          scmCertClient.getCACertificate()),
-          String.format(DefaultCertificateClient.CERT_FILE_NAME_FORMAT,
-              CAType.ROOT.getFileNamePrefix() +
-                  rootCert.getSerialNumber().toString()));
-      omStore.setOmCertSerialId(certId);
-      omStore.initialize();
-
-      conf.setBoolean(HDDS_GRPC_TLS_ENABLED, true);
-      conf.setBoolean(OZONE_OM_S3_GPRC_SERVER_ENABLED, true);
-      conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT, true);
-      OzoneManager.setTestSecureOmFlag(true);
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      // In this process, SCM has already login using Kerberos. So pass
-      // specific UGI to DefaultCertificateClient and OzoneManager to avoid
-      // conflict with SCM procedure.
-      DefaultCertificateClient.setUgi(ugi);
-      OzoneManager.setUgi(ugi);
-      om = OzoneManager.createOm(conf);
-      om.start();
-
-      CertificateClient omCertClient = om.getCertificateClient();
-      X509Certificate omCert = omCertClient.getCertificate();
-      X509Certificate caCert = omCertClient.getCACertificate();
-      X509Certificate rootCaCert = omCertClient.getRootCACertificate();
-      List certList = new ArrayList<>();
-      certList.add(caCert);
-      certList.add(rootCaCert);
-      // set certificates in GrpcOmTransport
-      GrpcOmTransport.setCaCerts(certList);
-
-      GenericTestUtils.waitFor(() -> om.isLeaderReady(), 500, 10000);
-      String transportCls = GrpcOmTransportFactory.class.getName();
-      conf.set(OZONE_OM_TRANSPORT_CLASS, transportCls);
-      OzoneClient client = OzoneClientFactory.getRpcClient(conf);
-
-      ServiceInfoEx serviceInfoEx = client.getObjectStore()
-          .getClientProxy().getOzoneManagerClient().getServiceInfo();
-      Assert.assertTrue(serviceInfoEx.getCaCertificate().equals(
-          CertificateCodec.getPEMEncodedString(caCert)));
-
-      // Wait for OM certificate to renewed
-      GenericTestUtils.waitFor(() ->
-              !omCert.getSerialNumber().toString().equals(
-                  omCertClient.getCertificate().getSerialNumber().toString()),
-          500, certLifetime * 1000);
-
-      // rerun the command using old client, it should succeed
-      serviceInfoEx = client.getObjectStore()
-          .getClientProxy().getOzoneManagerClient().getServiceInfo();
-      Assert.assertTrue(serviceInfoEx.getCaCertificate().equals(
-          CertificateCodec.getPEMEncodedString(caCert)));
-      client.close();
-
-      // get new client, it should succeed.
-      try {
-        OzoneClient client1 = OzoneClientFactory.getRpcClient(conf);
-        client1.close();
-      } catch (Exception e) {
-        System.out.println("OzoneClientFactory.getRpcClient failed for " +
-            e.getMessage());
-        fail("Create client should succeed for certificate is renewed");
-      }
-
-      // Wait for old OM certificate to expire
-      GenericTestUtils.waitFor(() -> omCert.getNotAfter().before(new Date()),
-          500, certLifetime * 1000);
-      // get new client, it should succeed too.
-      try {
-        OzoneClientFactory.getRpcClient(conf);
-      } catch (Exception e) {
-        System.out.println("OzoneClientFactory.getRpcClient failed for " +
-            e.getMessage());
-        fail("Create client should succeed for certificate is renewed");
-      }
-    } finally {
-      DefaultCertificateClient.setUgi(null);
-      OzoneManager.setUgi(null);
-      GrpcOmTransport.setCaCerts(null);
-    }
-  }
-
   public void validateCertificate(X509Certificate cert) throws Exception {
 
     // Assert that we indeed have a self signed certificate.
@@ -1448,37 +1226,4 @@ public final class TestSecureOzoneCluster {
         .setScmID("test")
         .build();
   }
-
-  private static X509CertificateHolder generateX509CertHolder(
-      OzoneConfiguration conf, KeyPair keyPair, KeyPair rootKeyPair,
-      X509Certificate rootCert, Duration certLifetime, String subject,
-      String clusterId) throws Exception {
-    // Generate normal certificate, signed by RootCA certificate
-    SecurityConfig secConfig = new SecurityConfig(conf);
-    DefaultApprover approver = new DefaultApprover(new DefaultProfile(),
-        secConfig);
-
-    CertificateSignRequest.Builder csrBuilder =
-        new CertificateSignRequest.Builder();
-    // Get host name.
-    csrBuilder.setKey(keyPair)
-        .setConfiguration(conf)
-        .setScmID("test")
-        .setClusterID(clusterId)
-        .setSubject(subject)
-        .setDigitalSignature(true)
-        .setDigitalEncryption(true);
-
-    LocalDateTime start = LocalDateTime.now();
-    String certDuration = conf.get(HDDS_X509_DEFAULT_DURATION,
-        HDDS_X509_DEFAULT_DURATION_DEFAULT);
-    X509CertificateHolder certificateHolder =
-        approver.sign(secConfig, rootKeyPair.getPrivate(),
-            new X509CertificateHolder(rootCert.getEncoded()),
-            Date.from(start.atZone(ZoneId.systemDefault()).toInstant()),
-            Date.from(start.plus(Duration.parse(certDuration))
-                .atZone(ZoneId.systemDefault()).toInstant()),
-            csrBuilder.build(), "test", clusterId);
-    return certificateHolder;
-  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/SecretKeyTestClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/SecretKeyTestClient.java
new file mode 100644
index 0000000000..8742560e25
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/SecretKeyTestClient.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client;
+
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Test implementation of {@link SecretKeyClient}.
+ */
+public class SecretKeyTestClient implements SecretKeyClient {
+  private final Map<UUID, ManagedSecretKey> keysMap = new HashMap<>();
+  private ManagedSecretKey current;
+
+  public SecretKeyTestClient() {
+    rotate();
+  }
+
+  public void rotate() {
+    this.current = generateKey();
+    keysMap.put(current.getId(), current);
+  }
+
+  @Override
+  public ManagedSecretKey getCurrentSecretKey() {
+    return current;
+  }
+
+  @Override
+  public ManagedSecretKey getSecretKey(UUID id) {
+    return keysMap.get(id);
+  }
+
+  private ManagedSecretKey generateKey() {
+    KeyGenerator keyGen = null;
+    try {
+      keyGen = KeyGenerator.getInstance("HmacSHA256");
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException("Should never happen", e);
+    }
+    SecretKey secretKey = keyGen.generateKey();
+    return new ManagedSecretKey(
+        UUID.randomUUID(),
+        Instant.now(),
+        Instant.now().plus(Duration.ofHours(1)),
+        secretKey
+    );
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 3e19a4577c..2aff017474 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.SecretKeyTestClient;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.TestHelper;
@@ -112,6 +113,7 @@ public class TestContainerStateMachine {
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
             .setHbInterval(200)
             .setCertificateClient(new CertificateClientTestImpl(conf))
+            .setSecretKeyClient(new SecretKeyTestClient())
             .build();
     cluster.setWaitForClusterToBeReadyTimeout(300000);
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
index a7dc787a8b..61eeec0739 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFlushDelay.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.SecretKeyTestClient;
 import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
@@ -116,6 +117,7 @@ public class TestContainerStateMachineFlushDelay {
             .setStreamBufferSizeUnit(StorageUnit.BYTES)
             .setHbInterval(200)
             .setCertificateClient(new CertificateClientTestImpl(conf))
+            .setSecretKeyClient(new SecretKeyTestClient())
             .build();
     cluster.waitForClusterToBeReady();
     cluster.getOzoneManager().startSecretManager();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
index 479eea5f7f..fcb42a56eb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneAtRestEncryption.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.hdds.scm.storage.MultipartInputStream;
+import org.apache.hadoop.ozone.client.SecretKeyTestClient;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
@@ -152,6 +153,7 @@ public class TestOzoneAtRestEncryption {
         .setChunkSize(CHUNK_SIZE)
         .setStreamBufferSizeUnit(StorageUnit.BYTES)
         .setCertificateClient(certificateClientTest)
+        .setSecretKeyClient(new SecretKeyTestClient())
         .build();
     cluster.getOzoneManager().startSecretManager();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
index b8fc491769..353023f186 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
@@ -43,6 +42,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.SecretKeyTestClient;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -62,7 +62,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRespo
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3Authentication;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.LambdaTestUtils;
@@ -97,7 +96,6 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
   private static final String CLUSTER_ID = UUID.randomUUID().toString();
   private static File testDir;
   private static OzoneConfiguration conf;
-  private static OzoneBlockTokenSecretManager secretManager;
 
   /**
    * Create a MiniOzoneCluster for testing.
@@ -131,10 +129,8 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
         .setScmId(SCM_ID)
         .setClusterId(CLUSTER_ID)
         .setCertificateClient(certificateClientTest)
+        .setSecretKeyClient(new SecretKeyTestClient())
         .build();
-    secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf),
-        60 * 60);
-    secretManager.start(certificateClientTest);
     cluster.getOzoneManager().startSecretManager();
     cluster.waitForClusterToBeReady();
     ozClient = OzoneClientFactory.getRpcClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 41e46c0685..1d9a86f41b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -80,7 +80,7 @@ public class TestOzoneContainer {
       DatanodeStateMachine dsm = Mockito.mock(DatanodeStateMachine.class);
       Mockito.when(dsm.getDatanodeDetails()).thenReturn(datanodeDetails);
       Mockito.when(context.getParent()).thenReturn(dsm);
-      container = new OzoneContainer(datanodeDetails, conf, context, null);
+      container = new OzoneContainer(datanodeDetails, conf, context);
       //Set clusterId and manually start ozone container.
       container.start(UUID.randomUUID().toString());
 
@@ -112,8 +112,7 @@ public class TestOzoneContainer {
       DatanodeStateMachine dsm = Mockito.mock(DatanodeStateMachine.class);
       Mockito.when(dsm.getDatanodeDetails()).thenReturn(datanodeDetails);
       Mockito.when(context.getParent()).thenReturn(dsm);
-      container = new OzoneContainer(datanodeDetails, conf,
-          context, null);
+      container = new OzoneContainer(datanodeDetails, conf, context);
 
       String clusterId = UUID.randomUUID().toString();
       container.start(clusterId);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index c3f4902d00..de7570993d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -27,12 +27,13 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.SecretKeyTestClient;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -101,6 +102,7 @@ public class TestOzoneContainerWithTLS {
   private OzoneConfiguration conf;
   private ContainerTokenSecretManager secretManager;
   private CertificateClientTestImpl caClient;
+  private SecretKeyClient secretKeyClient;
   private boolean containerTokenEnabled;
   private int certLifetime = 15 * 1000; // 15s
 
@@ -148,8 +150,9 @@ public class TestOzoneContainerWithTLS {
         TimeUnit.MILLISECONDS);
 
     caClient = new CertificateClientTestImpl(conf);
-    secretManager = new ContainerTokenSecretManager(new SecurityConfig(conf),
-        expiryTime);
+    secretKeyClient = new SecretKeyTestClient();
+    secretManager = new ContainerTokenSecretManager(expiryTime,
+        secretKeyClient);
   }
 
   @Test(expected = CertificateExpiredException.class)
@@ -181,7 +184,8 @@ public class TestOzoneContainerWithTLS {
       conf.setBoolean(
           OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
 
-      container = new OzoneContainer(dn, conf, getContext(dn), caClient);
+      container = new OzoneContainer(dn, conf, getContext(dn), caClient,
+          secretKeyClient);
       //Set scmId and manually start ozone container.
       container.start(UUID.randomUUID().toString());
 
@@ -189,7 +193,6 @@ public class TestOzoneContainerWithTLS {
           Collections.singletonList(caClient.getCACertificate()));
 
       if (containerTokenEnabled) {
-        secretManager.start(caClient);
         client.connect();
         createSecureContainer(client, containerId,
             secretManager.generateToken(
@@ -220,15 +223,12 @@ public class TestOzoneContainerWithTLS {
 
     OzoneContainer container = null;
     try {
-      container = new OzoneContainer(dn, conf, getContext(dn), caClient);
+      container = new OzoneContainer(dn, conf, getContext(dn), caClient,
+          secretKeyClient);
 
       // Set scmId and manually start ozone container.
       container.start(UUID.randomUUID().toString());
 
-      if (containerTokenEnabled) {
-        secretManager.start(caClient);
-      }
-
       // Create containers
       long containerId = ContainerTestHelper.getTestContainerID();
       List<Long> containerIdList = new ArrayList<>();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
index ba741a71f4..3af62f0e04 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
@@ -19,23 +19,25 @@
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClientTestImpl;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.ozone.client.SecretKeyTestClient;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -93,6 +95,7 @@ public class TestSecureOzoneContainer {
   private final boolean hasToken;
   private final boolean tokenExpired;
   private CertificateClientTestImpl caClient;
+  private SecretKeyClient secretKeyClient;
   private ContainerTokenSecretManager secretManager;
 
   public TestSecureOzoneContainer(Boolean requireToken,
@@ -122,8 +125,9 @@ public class TestSecureOzoneContainer {
     conf.set(OZONE_METADATA_DIRS, ozoneMetaPath);
     secConfig = new SecurityConfig(conf);
     caClient = new CertificateClientTestImpl(conf);
+    secretKeyClient = new SecretKeyTestClient();
     secretManager = new ContainerTokenSecretManager(
-        new SecurityConfig(conf), TimeUnit.DAYS.toMillis(1));
+        TimeUnit.DAYS.toMillis(1), secretKeyClient);
   }
 
   @Test
@@ -146,7 +150,8 @@ public class TestSecureOzoneContainer {
       conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
 
       DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
-      container = new OzoneContainer(dn, conf, getContext(dn), caClient);
+      container = new OzoneContainer(dn, conf, getContext(dn), caClient,
+          secretKeyClient);
       //Set scmId and manually start ozone container.
       container.start(UUID.randomUUID().toString());
 
@@ -159,7 +164,6 @@ public class TestSecureOzoneContainer {
         port = secConfig.getConfiguration().getInt(OzoneConfigKeys
                 .DFS_CONTAINER_IPC_PORT, DFS_CONTAINER_IPC_PORT_DEFAULT);
       }
-      secretManager.start(caClient);
 
       ugi.doAs((PrivilegedAction<Void>) () -> {
         try {
@@ -173,7 +177,7 @@ public class TestSecureOzoneContainer {
                 : Instant.now().plusSeconds(3600);
             ContainerTokenIdentifier tokenIdentifier =
                 new ContainerTokenIdentifier(user, containerID,
-                    caClient.getCertificate().getSerialNumber().toString(),
+                    secretKeyClient.getCurrentSecretKey().getId(),
                     expiryDate);
             token = secretManager.generateToken(tokenIdentifier);
           }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 1890a95daa..3bd51c35db 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeyClient;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.security.token.ContainerTokenSecretManager;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.RatisTestHelper;
+import org.apache.hadoop.ozone.client.SecretKeyTestClient;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
@@ -118,6 +120,7 @@ public class TestSecureContainerServer {
       = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
   private static final OzoneConfiguration CONF = new OzoneConfiguration();
   private static CertificateClientTestImpl caClient;
+  private static SecretKeyClient secretKeyClient;
   private static OzoneBlockTokenSecretManager blockTokenSecretManager;
   private static ContainerTokenSecretManager containerTokenSecretManager;
 
@@ -129,17 +132,15 @@ public class TestSecureContainerServer {
     CONF.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
     CONF.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
     caClient = new CertificateClientTestImpl(CONF);
+    secretKeyClient = new SecretKeyTestClient();
 
-    SecurityConfig secConf = new SecurityConfig(CONF);
     long tokenLifetime = TimeUnit.HOURS.toMillis(1);
 
-    blockTokenSecretManager = new OzoneBlockTokenSecretManager(
-        secConf, tokenLifetime);
-    blockTokenSecretManager.start(caClient);
+    blockTokenSecretManager = new OzoneBlockTokenSecretManager(tokenLifetime,
+        secretKeyClient);
 
     containerTokenSecretManager = new ContainerTokenSecretManager(
-        secConf, tokenLifetime);
-    containerTokenSecretManager.start(caClient);
+        tokenLifetime, secretKeyClient);
   }
 
   @AfterClass
@@ -149,8 +150,6 @@ public class TestSecureContainerServer {
 
   @After
   public void cleanUp() throws IOException {
-    containerTokenSecretManager.stop();
-    blockTokenSecretManager.stop();
     FileUtils.deleteQuietly(new File(CONF.get(HDDS_DATANODE_DIR_KEY)));
   }
 
@@ -194,7 +193,7 @@ public class TestSecureContainerServer {
     }
     HddsDispatcher hddsDispatcher = new HddsDispatcher(
         conf, containerSet, volumeSet, handlers, context, metrics,
-        TokenVerifier.create(new SecurityConfig((conf)), caClient));
+        TokenVerifier.create(new SecurityConfig(conf), secretKeyClient));
     hddsDispatcher.setClusterId(scmId.toString());
     return hddsDispatcher;
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 35361ef4bc..6f360686ea 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -95,6 +95,10 @@ import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.security.OzoneSecurityException;
+import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
+import org.apache.hadoop.hdds.security.symmetric.DefaultSecretKeySignerClient;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.ozone.security.OMCertificateClient;
@@ -162,7 +166,6 @@ import org.apache.hadoop.ozone.om.protocolPB.OMAdminProtocolClientSideImpl;
 import org.apache.hadoop.ozone.om.protocolPB.OMAdminProtocolPB;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.common.ha.ratis.RatisSnapshotInfo;
-import org.apache.hadoop.hdds.security.OzoneSecurityException;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
@@ -183,7 +186,6 @@ import org.apache.hadoop.ozone.protocolPB.OMInterServiceProtocolServerSideImpl;
 import org.apache.hadoop.ozone.protocolPB.OMAdminProtocolServerSideImpl;
 import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
 import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
@@ -324,6 +326,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private OzoneDelegationTokenSecretManager delegationTokenMgr;
   private OzoneBlockTokenSecretManager blockTokenMgr;
   private CertificateClient certClient;
+  private SecretKeySignerClient secretKeyClient;
   private String caCertPem = null;
   private List<String> caCertPemList = new ArrayList<>();
   private final Text omRpcAddressTxt;
@@ -601,9 +604,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       certClient = new OMCertificateClient(secConfig, omStorage,
           scmInfo == null ? null : scmInfo.getScmId(), this::saveNewCertId,
           this::terminateOM);
+      secretKeyClient = DefaultSecretKeySignerClient.create(conf);
     }
     if (secConfig.isBlockTokenEnabled()) {
-      blockTokenMgr = createBlockTokenSecretManager(configuration);
+      blockTokenMgr = createBlockTokenSecretManager();
     }
 
     // Enable S3 multi-tenancy if config keys are set
@@ -986,38 +990,18 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         .build();
   }
 
-  private OzoneBlockTokenSecretManager createBlockTokenSecretManager(
-      OzoneConfiguration conf) {
-
-    long expiryTime = conf.getTimeDuration(
+  private OzoneBlockTokenSecretManager createBlockTokenSecretManager() {
+    long expiryTime = configuration.getTimeDuration(
         HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME,
         HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT,
         TimeUnit.MILLISECONDS);
-    long certificateGracePeriod = Duration.parse(
-        conf.get(HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATION,
-            HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATION_DEFAULT)).toMillis();
-    if (expiryTime > certificateGracePeriod) {
-      throw new IllegalArgumentException("Certificate grace period " +
-          HddsConfigKeys.HDDS_X509_RENEW_GRACE_DURATION +
-          " should be greater than maximum block token lifetime " +
-          HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME);
-    }
-    // TODO: Pass OM cert serial ID.
-    if (testSecureOmFlag) {
-      return new OzoneBlockTokenSecretManager(secConfig, expiryTime);
-    }
-    Objects.requireNonNull(certClient);
-    return new OzoneBlockTokenSecretManager(secConfig, expiryTime);
+    return new OzoneBlockTokenSecretManager(expiryTime, secretKeyClient);
   }
 
   private void stopSecretManager() {
-    if (blockTokenMgr != null) {
-      LOG.info("Stopping OM block token manager.");
-      try {
-        blockTokenMgr.stop();
-      } catch (IOException e) {
-        LOG.error("Failed to stop block token manager", e);
-      }
+    if (secretKeyClient != null) {
+      LOG.info("Stopping secret key client.");
+      secretKeyClient.stop();
     }
 
     if (delegationTokenMgr != null) {
@@ -1038,13 +1022,13 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       LOG.error("Unable to read key pair for OM.", e);
       throw new UncheckedIOException(e);
     }
+
     if (secConfig.isBlockTokenEnabled() && blockTokenMgr != null) {
+      LOG.info("Starting secret key client.");
       try {
-        LOG.info("Starting OM block token secret manager");
-        blockTokenMgr.start(certClient);
+        secretKeyClient.start(configuration);
       } catch (IOException e) {
-        // Unable to start secret manager.
-        LOG.error("Error starting block token secret manager.", e);
+        LOG.error("Unable to initialize secret key.", e);
         throw new UncheckedIOException(e);
       }
     }
@@ -1070,6 +1054,17 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     this.certClient = certClient;
   }
 
+  /**
+   * For testing purpose only. This allows testing token in integration test
+   * without fully setting up a working secure cluster.
+   */
+  @VisibleForTesting
+  public void setSecretKeyClient(
+      SecretKeySignerClient secretKeyClient) {
+    this.secretKeyClient = secretKeyClient;
+    blockTokenMgr.setSecretKeyClient(secretKeyClient);
+  }
+
   /**
    * Login OM service user if security and Kerberos are enabled.
    *
@@ -2249,8 +2244,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private void startSecretManagerIfNecessary() {
     boolean shouldRun = isOzoneSecurityEnabled();
     if (shouldRun) {
-      boolean running = delegationTokenMgr.isRunning()
-          && blockTokenMgr.isRunning();
+      boolean running = delegationTokenMgr.isRunning();
       if (!running) {
         startSecretManager();
       }
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneManagerBlockToken.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneManagerBlockToken.java
deleted file mode 100644
index 3297e3d992..0000000000
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneManagerBlockToken.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.security;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
-import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
-import org.apache.ozone.test.GenericTestUtils;
-import org.apache.hadoop.util.Time;
-import org.junit.After;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.crypto.KeyGenerator;
-import javax.crypto.Mac;
-import javax.crypto.SecretKey;
-import java.io.File;
-import java.io.IOException;
-import java.security.GeneralSecurityException;
-import java.security.InvalidKeyException;
-import java.security.KeyPair;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
-import java.security.PrivateKey;
-import java.security.Signature;
-import java.security.SignatureException;
-import java.security.cert.Certificate;
-import java.security.cert.CertificateEncodingException;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Test class for OzoneManagerDelegationToken.
- */
-public class TestOzoneManagerBlockToken {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestOzoneManagerBlockToken.class);
-  private static final String BASEDIR = GenericTestUtils
-      .getTempPath(TestOzoneManagerBlockToken.class.getSimpleName());
-  private static final String KEYSTORES_DIR =
-      new File(BASEDIR).getAbsolutePath();
-  private static long expiryTime;
-  private static KeyPair keyPair;
-  private static X509Certificate cert;
-  private static final long MAX_LEN = 1000;
-
-  @BeforeClass
-  public static void setUp() throws Exception {
-    File base = new File(BASEDIR);
-    FileUtil.fullyDelete(base);
-    base.mkdirs();
-    expiryTime = Time.monotonicNow() + 60 * 60 * 24;
-
-    // Create Ozone Master key pair.
-    keyPair = KeyStoreTestUtil.generateKeyPair("RSA");
-    // Create Ozone Master certificate (SCM CA issued cert) and key store.
-    cert = KeyStoreTestUtil
-        .generateCertificate("CN=OzoneMaster", keyPair, 30, "SHA256withRSA");
-  }
-
-  @After
-  public void cleanUp() {
-  }
-
-  @Test
-  public void testSignToken() throws GeneralSecurityException, IOException {
-    String keystore = new File(KEYSTORES_DIR, "keystore.jks")
-        .getAbsolutePath();
-    String truststore = new File(KEYSTORES_DIR, "truststore.jks")
-        .getAbsolutePath();
-    String trustPassword = "trustPass";
-    String keyStorePassword = "keyStorePass";
-    String keyPassword = "keyPass";
-
-
-    KeyStoreTestUtil.createKeyStore(keystore, keyStorePassword, keyPassword,
-        "OzoneMaster", keyPair.getPrivate(), cert);
-
-    // Create trust store and put the certificate in the trust store
-    Map<String, X509Certificate> certs = Collections.singletonMap("server",
-        cert);
-    KeyStoreTestUtil.createTrustStore(truststore, trustPassword, certs);
-
-    // Sign the OzoneMaster Token with Ozone Master private key
-    PrivateKey privateKey = keyPair.getPrivate();
-    OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
-        "testUser", "84940",
-        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString(), MAX_LEN);
-    byte[] signedToken = signTokenAsymmetric(tokenId, privateKey);
-
-    // Verify a valid signed OzoneMaster Token with Ozone Master
-    // public key(certificate)
-    boolean isValidToken = verifyTokenAsymmetric(tokenId, signedToken, cert);
-    LOG.info("{} is {}", tokenId, isValidToken ? "valid." : "invalid.");
-
-    // Verify an invalid signed OzoneMaster Token with Ozone Master
-    // public key(certificate)
-    tokenId = new OzoneBlockTokenIdentifier("", "",
-        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString(), MAX_LEN);
-    LOG.info("Unsigned token {} is {}", tokenId,
-        verifyTokenAsymmetric(tokenId, RandomUtils.nextBytes(128), cert));
-
-  }
-
-  public byte[] signTokenAsymmetric(OzoneBlockTokenIdentifier tokenId,
-      PrivateKey privateKey) throws NoSuchAlgorithmException,
-      InvalidKeyException, SignatureException {
-    Signature rsaSignature = Signature.getInstance("SHA256withRSA");
-    rsaSignature.initSign(privateKey);
-    rsaSignature.update(tokenId.getBytes());
-    byte[] signature = rsaSignature.sign();
-    return signature;
-  }
-
-  public boolean verifyTokenAsymmetric(OzoneBlockTokenIdentifier tokenId,
-      byte[] signature, Certificate certificate) throws InvalidKeyException,
-      NoSuchAlgorithmException, SignatureException {
-    Signature rsaSignature = Signature.getInstance("SHA256withRSA");
-    rsaSignature.initVerify(certificate);
-    rsaSignature.update(tokenId.getBytes());
-    boolean isValid = rsaSignature.verify(signature);
-    return isValid;
-  }
-
-  private byte[] signTokenSymmetric(OzoneBlockTokenIdentifier identifier,
-      Mac mac, SecretKey key) {
-    try {
-      mac.init(key);
-    } catch (InvalidKeyException ike) {
-      throw new IllegalArgumentException("Invalid key to HMAC computation",
-          ike);
-    }
-    return mac.doFinal(identifier.getBytes());
-  }
-
-  OzoneBlockTokenIdentifier generateTestToken() {
-    return new OzoneBlockTokenIdentifier(RandomStringUtils.randomAlphabetic(6),
-        RandomStringUtils.randomAlphabetic(5),
-        EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
-        expiryTime, cert.getSerialNumber().toString(), MAX_LEN);
-  }
-
-  @Test
-  public void testAsymmetricTokenPerf() throws NoSuchAlgorithmException,
-      CertificateEncodingException, NoSuchProviderException,
-      InvalidKeyException, SignatureException {
-    final int testTokenCount = 1000;
-    List<OzoneBlockTokenIdentifier> tokenIds = new ArrayList<>();
-    List<byte[]> tokenPasswordAsym = new ArrayList<>();
-    for (int i = 0; i < testTokenCount; i++) {
-      tokenIds.add(generateTestToken());
-    }
-
-    KeyPair kp = KeyStoreTestUtil.generateKeyPair("RSA");
-
-    // Create Ozone Master certificate (SCM CA issued cert) and key store
-    X509Certificate omCert;
-    omCert = KeyStoreTestUtil.generateCertificate("CN=OzoneMaster",
-        kp, 30, "SHA256withRSA");
-
-    long startTime = Time.monotonicNowNanos();
-    for (int i = 0; i < testTokenCount; i++) {
-      tokenPasswordAsym.add(
-          signTokenAsymmetric(tokenIds.get(i), kp.getPrivate()));
-    }
-    long duration = Time.monotonicNowNanos() - startTime;
-    LOG.info("Average token sign time with HmacSha256(RSA/1024 key) is {} ns",
-        duration / testTokenCount);
-
-    startTime = Time.monotonicNowNanos();
-    for (int i = 0; i < testTokenCount; i++) {
-      verifyTokenAsymmetric(tokenIds.get(i), tokenPasswordAsym.get(i), omCert);
-    }
-    duration = Time.monotonicNowNanos() - startTime;
-    LOG.info("Average token verify time with HmacSha256(RSA/1024 key) "
-        + "is {} ns", duration / testTokenCount);
-  }
-
-  @Test
-  public void testSymmetricTokenPerf() {
-    String hmacSHA1 = "HmacSHA1";
-    String hmacSHA256 = "HmacSHA256";
-
-    testSymmetricTokenPerfHelper(hmacSHA1, 64);
-    testSymmetricTokenPerfHelper(hmacSHA256, 1024);
-  }
-
-  public void testSymmetricTokenPerfHelper(String hmacAlgorithm, int keyLen) {
-    final int testTokenCount = 1000;
-    List<OzoneBlockTokenIdentifier> tokenIds = new ArrayList<>();
-    List<byte[]> tokenPasswordSym = new ArrayList<>();
-    for (int i = 0; i < testTokenCount; i++) {
-      tokenIds.add(generateTestToken());
-    }
-
-    KeyGenerator keyGen;
-    try {
-      keyGen = KeyGenerator.getInstance(hmacAlgorithm);
-      keyGen.init(keyLen);
-    } catch (NoSuchAlgorithmException nsa) {
-      throw new IllegalArgumentException("Can't find " + hmacAlgorithm +
-          " algorithm.");
-    }
-
-    Mac mac;
-    try {
-      mac = Mac.getInstance(hmacAlgorithm);
-    } catch (NoSuchAlgorithmException nsa) {
-      throw new IllegalArgumentException("Can't find " + hmacAlgorithm +
-          " algorithm.");
-    }
-
-    SecretKey secretKey = keyGen.generateKey();
-
-    long startTime = Time.monotonicNowNanos();
-    for (int i = 0; i < testTokenCount; i++) {
-      tokenPasswordSym.add(
-          signTokenSymmetric(tokenIds.get(i), mac, secretKey));
-    }
-    long duration = Time.monotonicNowNanos() - startTime;
-    LOG.info("Average token sign time with {}({} symmetric key) is {} ns",
-        hmacAlgorithm, keyLen, duration / testTokenCount);
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org