You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by du...@apache.org on 2023/04/19 17:37:31 UTC

[ozone] branch HDDS-7733-Symmetric-Tokens updated: wip of secret key authorization

This is an automated email from the ASF dual-hosted git repository.

duong pushed a commit to branch HDDS-7733-Symmetric-Tokens
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7733-Symmetric-Tokens by this push:
     new 57320fc109 wip of secret key authorization
57320fc109 is described below

commit 57320fc10985c4835d308c0fc06441cac2b364c5
Author: Duong Nguyen <du...@gmail.com>
AuthorDate: Wed Apr 19 10:37:19 2023 -0700

    wip of secret key authorization
---
 .../security/exception/SCMSecretKeyException.java  |  47 ++++
 .../hadoop/hdds/protocol/SCMSecretKeyProtocol.java |  55 ++++
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  |  21 --
 ...SCMSecretKeyProtocolClientSideTranslatorPB.java | 165 +++++++++++
 .../protocolPB/SCMSecretKeyProtocolDatanodePB.java |  39 +++
 .../hdds/protocolPB/SCMSecretKeyProtocolOmPB.java  |  39 +++
 .../SCMSecurityProtocolClientSideTranslatorPB.java |  37 ---
 .../SCMSecretKeyProtocolFailoverProxyProvider.java | 310 +++++++++++++++++++++
 .../security/symmetric/DefaultSecretKeyClient.java |   6 +-
 .../symmetric/DefaultSecretKeySignerClient.java    |  18 +-
 .../symmetric/DefaultSecretKeyVerifierClient.java  |  10 +-
 .../hdds/security/symmetric/ManagedSecretKey.java  |   8 +-
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  15 +
 .../src/main/proto/ScmSecretKeyProtocol.proto      | 111 ++++++++
 .../src/main/proto/ScmServerSecurityProtocol.proto |  42 +--
 .../hdds/scm/ha/io/ManagedSecretKeyCodec.java      |   6 +-
 ...SCMSecretKeyProtocolServerSideTranslatorPB.java | 167 +++++++++++
 .../SCMSecurityProtocolServerSideTranslatorPB.java |  47 ----
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  36 ++-
 .../org/apache/hadoop/ozone/TestSecretKeysApi.java |  35 +--
 20 files changed, 1018 insertions(+), 196 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecretKeyException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecretKeyException.java
new file mode 100644
index 0000000000..2b2b251869
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecretKeyException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.security.exception;
+
+import java.io.IOException;
+
+/**
+ * Exception for all secret key related errors.
+ */
+public class SCMSecretKeyException extends IOException {
+  private final ErrorCode errorCode;
+
+  public SCMSecretKeyException(String message, ErrorCode errorCode) {
+    super(message);
+    this.errorCode = errorCode;
+  }
+
+  public ErrorCode getErrorCode() {
+    return errorCode;
+  }
+
+  /**
+   * Error codes to make it easy to decode these exceptions.
+   */
+  public enum ErrorCode {
+    OK,
+    INTERNAL_ERROR,
+    SECRET_KEY_NOT_ENABLED,
+    SECRET_KEY_NOT_INITIALIZED
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecretKeyProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecretKeyProtocol.java
new file mode 100644
index 0000000000..cd4cbbd760
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecretKeyProtocol.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocol;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.security.KerberosInfo;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * The protocol used to expose secret keys in SCM.
+ */
+@KerberosInfo(
+    serverPrincipal = ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
+@InterfaceAudience.Private
+public interface SCMSecretKeyProtocol {
+
+  /**
+   * Get the current SecretKey that is used for signing tokens.
+   * @return ManagedSecretKey
+   */
+  ManagedSecretKey getCurrentSecretKey() throws IOException;
+
+  /**
+   * Get a particular SecretKey by ID.
+   *
+   * @param id the id to get SecretKey.
+   * @return ManagedSecretKey.
+   */
+  ManagedSecretKey getSecretKey(UUID id) throws IOException;
+
+  /**
+   * Get all the non-expired SecretKey managed by SCM.
+   * @return list of ManagedSecretKey.
+   */
+  List<ManagedSecretKey> getAllSecretKeys() throws IOException;
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
index 1cfe568d8a..6040f43a51 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocol/SCMSecurityProtocol.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeDetailsProto;
 import org.apache.hadoop.hdds.scm.ScmConfig;
-import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.security.KerberosInfo;
 
@@ -173,24 +172,4 @@ public interface SCMSecurityProtocol {
   String getCertificate(NodeDetailsProto nodeDetails,
       String certSignReq) throws IOException;
 
-
-  /**
-   * Get the current SecretKey that is used for signing tokens.
-   * @return ManagedSecretKey
-   */
-  ManagedSecretKey getCurrentSecretKey() throws IOException;
-
-  /**
-   * Get a particular SecretKey by ID.
-   *
-   * @param id the id to get SecretKey.
-   * @return ManagedSecretKey.
-   */
-  ManagedSecretKey getSecretKey(UUID id) throws IOException;
-
-  /**
-   * Get all the non-expired SecretKey managed by SCM.
-   * @return list of ManagedSecretKey.
-   */
-  List<ManagedSecretKey> getAllSecretKeys() throws IOException;
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecretKeyProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecretKeyProtocolClientSideTranslatorPB.java
new file mode 100644
index 0000000000..66dce7156e
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecretKeyProtocolClientSideTranslatorPB.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocolPB;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.protocol.SCMSecretKeyProtocol;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetSecretKeyRequest;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetSecretKeyResponse;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyRequest;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyRequest.Builder;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyResponse;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.Type;
+import org.apache.hadoop.hdds.scm.proxy.SCMSecretKeyProtocolFailoverProxyProvider;
+import org.apache.hadoop.hdds.security.exception.SCMSecretKeyException;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * This class is the client-side translator that forwards requests for
+ * {@link SCMSecretKeyProtocol} to the {@link SCMSecurityProtocolPB} proxy.
+ */
+public class SCMSecretKeyProtocolClientSideTranslatorPB implements
+    SCMSecretKeyProtocol, ProtocolTranslator, Closeable {
+
+  /**
+   * RpcController is not used and hence is set to null.
+   */
+  private static final RpcController NULL_RPC_CONTROLLER = null;
+  private final SCMSecretKeyProtocolService.BlockingInterface rpcProxy;
+  private SCMSecretKeyProtocolFailoverProxyProvider failoverProxyProvider;
+
+  public SCMSecretKeyProtocolClientSideTranslatorPB(
+      SCMSecretKeyProtocolFailoverProxyProvider proxyProvider) {
+    Preconditions.checkState(proxyProvider != null);
+    this.failoverProxyProvider = proxyProvider;
+    this.rpcProxy = (SCMSecretKeyProtocolOmPB) RetryProxy.create(
+        SCMSecretKeyProtocolOmPB.class, failoverProxyProvider,
+        failoverProxyProvider.getRetryPolicy());
+  }
+
+  /**
+   * Helper method to wrap the request and send the message.
+   */
+  private SCMSecretKeyResponse submitRequest(
+      Type type,
+      Consumer<Builder> builderConsumer) throws IOException {
+    final SCMSecretKeyResponse response;
+    try {
+
+      Builder builder = SCMSecretKeyRequest.newBuilder()
+          .setCmdType(type)
+          .setTraceID(TracingUtil.exportCurrentSpan());
+      builderConsumer.accept(builder);
+      SCMSecretKeyRequest wrapper = builder.build();
+
+      response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
+
+      handleError(response);
+
+    } catch (ServiceException ex) {
+      throw ProtobufHelper.getRemoteException(ex);
+    }
+    return response;
+  }
+
+  private SCMSecretKeyResponse handleError(SCMSecretKeyResponse resp)
+      throws SCMSecretKeyException {
+    if (resp.getStatus() != SCMSecretKeyProtocolProtos.Status.OK) {
+      throw new SCMSecretKeyException(resp.getMessage(),
+          SCMSecretKeyException.ErrorCode.values()[resp.getStatus().ordinal()]);
+    }
+    return resp;
+  }
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   *
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the
+   * close may fail require careful attention. It is strongly advised
+   * to relinquish the underlying resources and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing
+   * the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    RPC.stopProxy(rpcProxy);
+  }
+
+
+  @Override
+  public ManagedSecretKey getCurrentSecretKey() throws IOException {
+    SCMSecretKeyProtocolProtos.ManagedSecretKey secretKeyProto =
+        submitRequest(Type.GetCurrentSecretKey, builder -> {
+        }).getCurrentSecretKeyResponseProto().getSecretKey();
+    return ManagedSecretKey.fromProtobuf(secretKeyProto);
+  }
+
+  @Override
+  public ManagedSecretKey getSecretKey(UUID id) throws IOException {
+    SCMGetSecretKeyRequest request = SCMGetSecretKeyRequest.newBuilder()
+        .setSecretKeyId(HddsProtos.UUID.newBuilder()
+            .setMostSigBits(id.getMostSignificantBits())
+            .setLeastSigBits(id.getLeastSignificantBits())).build();
+    SCMGetSecretKeyResponse response = submitRequest(Type.GetSecretKey,
+        builder -> builder.setGetSecretKeyRequest(request))
+        .getGetSecretKeyResponseProto();
+
+    return response.hasSecretKey() ?
+        ManagedSecretKey.fromProtobuf(response.getSecretKey()) : null;
+  }
+
+  @Override
+  public List<ManagedSecretKey> getAllSecretKeys() throws IOException {
+    List<SCMSecretKeyProtocolProtos.ManagedSecretKey> secretKeysList =
+        submitRequest(Type.GetAllSecretKeys, builder -> {
+        }).getSecretKeysListResponseProto().getSecretKeysList();
+    return secretKeysList.stream()
+        .map(ManagedSecretKey::fromProtobuf)
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Return the proxy object underlying this protocol translator.
+   *
+   * @return the proxy object underlying this protocol translator.
+   */
+  @Override
+  public Object getUnderlyingProxyObject() {
+    return rpcProxy;
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecretKeyProtocolDatanodePB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecretKeyProtocolDatanodePB.java
new file mode 100644
index 0000000000..168eda42e5
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecretKeyProtocolDatanodePB.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocolPB;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+
+/**
+ * Protocol for security related operations on SCM.
+ */
+@ProtocolInfo(protocolName =
+    "org.apache.hadoop.hdds.protocol.SCMSecurityProtocol",
+    protocolVersion = 1)
+@KerberosInfo(
+    serverPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY,
+    clientPrincipal = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY
+)
+public interface SCMSecretKeyProtocolDatanodePB extends
+    SCMSecretKeyProtocolService.BlockingInterface {
+
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecretKeyProtocolOmPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecretKeyProtocolOmPB.java
new file mode 100644
index 0000000000..84f28c37f6
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecretKeyProtocolOmPB.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.protocolPB;
+
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.security.KerberosInfo;
+
+import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
+
+/**
+ * Protocol for security related operations on SCM.
+ */
+@ProtocolInfo(protocolName =
+    "org.apache.hadoop.hdds.protocol.SCMSecurityProtocol",
+    protocolVersion = 1)
+@KerberosInfo(
+    serverPrincipal = HDDS_SCM_KERBEROS_PRINCIPAL_KEY
+//    clientPrincipal = OZONE_OM_KERBEROS_PRINCIPAL_KEY
+)
+public interface SCMSecretKeyProtocolOmPB extends
+    SCMSecretKeyProtocolService.BlockingInterface {
+
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
index ab09061c44..d40551e71c 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
@@ -22,9 +22,7 @@ import java.security.cert.CRLException;
 import java.security.cert.CertificateException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
@@ -42,8 +40,6 @@ import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCer
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyRequest;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyResponse;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCACertificateRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateRequestProto;
@@ -55,7 +51,6 @@ import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecuri
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type;
 import org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -195,38 +190,6 @@ public class SCMSecurityProtocolClientSideTranslatorPB implements
         .getX509Certificate();
   }
 
-  @Override
-  public ManagedSecretKey getCurrentSecretKey() throws IOException {
-    SCMSecurityProtocolProtos.ManagedSecretKey secretKeyProto =
-        submitRequest(Type.GetCurrentSecretKey, builder -> {
-        }).getCurrentSecretKeyResponseProto().getSecretKey();
-    return ManagedSecretKey.fromProtobuf(secretKeyProto);
-  }
-
-  @Override
-  public ManagedSecretKey getSecretKey(UUID id) throws IOException {
-    SCMGetSecretKeyRequest request = SCMGetSecretKeyRequest.newBuilder()
-        .setSecretKeyId(HddsProtos.UUID.newBuilder()
-            .setMostSigBits(id.getMostSignificantBits())
-            .setLeastSigBits(id.getLeastSignificantBits())).build();
-    SCMGetSecretKeyResponse response = submitRequest(Type.GetSecretKey,
-        builder -> builder.setGetSecretKeyRequest(request))
-        .getGetSecretKeyResponseProto();
-
-    return response.hasSecretKey() ?
-        ManagedSecretKey.fromProtobuf(response.getSecretKey()) : null;
-  }
-
-  @Override
-  public List<ManagedSecretKey> getAllSecretKeys() throws IOException {
-    List<SCMSecurityProtocolProtos.ManagedSecretKey> secretKeysList =
-        submitRequest(Type.GetAllSecretKeys, builder -> {
-        }).getSecretKeysListResponseProto().getSecretKeysList();
-    return secretKeysList.stream()
-        .map(ManagedSecretKey::fromProtobuf)
-        .collect(Collectors.toList());
-  }
-
   /**
    * Get signed certificate for SCM node.
    *
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecretKeyProtocolFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecretKeyProtocolFailoverProxyProvider.java
new file mode 100644
index 0000000000..ba22b3fe5e
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecretKeyProtocolFailoverProxyProvider.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService;
+import org.apache.hadoop.hdds.protocolPB.SCMSecretKeyProtocolOmPB;
+import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
+import org.apache.hadoop.hdds.scm.ha.SCMHAUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Failover proxy provider for SCMSecurityProtocol server.
+ */
+public class SCMSecretKeyProtocolFailoverProxyProvider implements
+    FailoverProxyProvider<SCMSecretKeyProtocolOmPB>, Closeable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMSecretKeyProtocolFailoverProxyProvider.class);
+
+  // scmNodeId -> ProxyInfo<rpcProxy>
+  private final Map<String,
+      ProxyInfo<SCMSecretKeyProtocolService.BlockingInterface>> scmProxies;
+
+  // scmNodeId -> SCMProxyInfo
+  private final Map<String, SCMProxyInfo> scmProxyInfoMap;
+
+  private List<String> scmNodeIds;
+
+  // As SCM Client is shared across threads, performFailOver()
+  // updates the currentProxySCMNodeId based on the updateLeaderNodeId which is
+  // updated in shouldRetry(). When 2 or more threads run in parallel, the
+  // RetryInvocationHandler will check the expectedFailOverCount
+  // and not execute performFailOver() for one of them. So the other thread(s)
+  // shall not call performFailOver(), it will call getProxy() which uses
+  // currentProxySCMNodeId and returns the proxy.
+  private volatile String currentProxySCMNodeId;
+  private volatile int currentProxyIndex;
+
+
+  private final ConfigurationSource conf;
+  private final SCMClientConfig scmClientConfig;
+  private final long scmVersion;
+
+  private String scmServiceId;
+
+  private final int maxRetryCount;
+  private final long retryInterval;
+
+  private final UserGroupInformation ugi;
+
+  private String updatedLeaderNodeID = null;
+
+  /**
+   * Construct fail-over proxy provider for SCMSecurityProtocol Server.
+   * @param conf
+   * @param userGroupInformation
+   */
+  public SCMSecretKeyProtocolFailoverProxyProvider(ConfigurationSource conf,
+      UserGroupInformation userGroupInformation) {
+    Preconditions.checkNotNull(userGroupInformation);
+    this.ugi = userGroupInformation;
+    this.conf = conf;
+    this.scmVersion = RPC.getProtocolVersion(SCMSecretKeyProtocolOmPB.class);
+
+    this.scmProxies = new HashMap<>();
+    this.scmProxyInfoMap = new HashMap<>();
+    loadConfigs();
+
+    this.currentProxyIndex = 0;
+    currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
+    scmClientConfig = conf.getObject(SCMClientConfig.class);
+    this.maxRetryCount = scmClientConfig.getRetryCount();
+    this.retryInterval = scmClientConfig.getRetryInterval();
+  }
+
+  protected synchronized void loadConfigs() {
+    List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
+    scmNodeIds = new ArrayList<>();
+
+    for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
+      if (scmNodeInfo.getScmSecurityAddress() == null) {
+        throw new ConfigurationException("SCM Client Address could not " +
+            "be obtained from config. Config is not properly defined");
+      } else {
+        InetSocketAddress scmSecurityAddress =
+            NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress());
+
+        scmServiceId = scmNodeInfo.getServiceId();
+        String scmNodeId = scmNodeInfo.getNodeId();
+
+        scmNodeIds.add(scmNodeId);
+        SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
+            scmSecurityAddress);
+        scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
+      }
+    }
+  }
+
+  @Override
+  public synchronized ProxyInfo<SCMSecretKeyProtocolOmPB> getProxy() {
+    ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
+    if (currentProxyInfo == null) {
+      currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId());
+    }
+    return currentProxyInfo;
+  }
+
+  /**
+   * Creates proxy object.
+   */
+  private ProxyInfo createSCMProxy(String nodeId) {
+    ProxyInfo proxyInfo;
+    SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
+    InetSocketAddress address = scmProxyInfo.getAddress();
+    try {
+      SCMSecretKeyProtocolService.BlockingInterface scmProxy = createSCMProxy(address);
+      // Create proxyInfo here, to make it work with all Hadoop versions.
+      proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString());
+      scmProxies.put(nodeId, proxyInfo);
+      return proxyInfo;
+    } catch (IOException ioe) {
+      LOG.error("{} Failed to create RPC proxy to SCM at {}",
+          this.getClass().getSimpleName(), address, ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private SCMSecretKeyProtocolService.BlockingInterface createSCMProxy(InetSocketAddress scmAddress)
+      throws IOException {
+    Configuration hadoopConf =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
+    RPC.setProtocolEngine(hadoopConf, SCMSecretKeyProtocolOmPB.class,
+        ProtobufRpcEngine.class);
+
+    // FailoverOnNetworkException ensures that the IPC layer does not attempt
+    // retries on the same SCM in case of connection exception. This retry
+    // policy essentially results in TRY_ONCE_THEN_FAIL.
+
+    RetryPolicy connectionRetryPolicy = RetryPolicies
+        .failoverOnNetworkException(0);
+
+    return RPC.getProtocolProxy(SCMSecretKeyProtocolOmPB.class,
+        scmVersion, scmAddress, ugi,
+        hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
+        (int)scmClientConfig.getRpcTimeOut(), connectionRetryPolicy).getProxy();
+  }
+
+
+  @Override
+  public synchronized void performFailover(SCMSecretKeyProtocolOmPB currentProxy) {
+    if (updatedLeaderNodeID != null) {
+      currentProxySCMNodeId = updatedLeaderNodeID;
+    } else {
+      nextProxyIndex();
+    }
+    LOG.debug("Failing over to next proxy. {}", getCurrentProxySCMNodeId());
+  }
+
+  public synchronized void performFailoverToAssignedLeader(String newLeader,
+      Exception e) {
+    ServerNotLeaderException snle =
+        (ServerNotLeaderException) SCMHAUtils.getServerNotLeaderException(e);
+    if (snle != null && snle.getSuggestedLeader() != null) {
+      Optional< SCMProxyInfo > matchedProxyInfo =
+          scmProxyInfoMap.values().stream().filter(
+              proxyInfo -> NetUtils.getHostPortString(proxyInfo.getAddress())
+                  .equals(snle.getSuggestedLeader())).findFirst();
+      if (matchedProxyInfo.isPresent()) {
+        newLeader = matchedProxyInfo.get().getNodeId();
+        LOG.debug("Performing failover to suggested leader {}, nodeId {}",
+            snle.getSuggestedLeader(), newLeader);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Suggested leader {} does not match with any of the " +
+                          "proxyInfo adress {}", snle.getSuggestedLeader(),
+                  Arrays.toString(scmProxyInfoMap.values().toArray()));
+        }
+      }
+    }
+    assignLeaderToNode(newLeader);
+  }
+
+
+  private synchronized void assignLeaderToNode(String newLeaderNodeId) {
+    if (!currentProxySCMNodeId.equals(newLeaderNodeId)) {
+      if (scmProxyInfoMap.containsKey(newLeaderNodeId)) {
+        updatedLeaderNodeID = newLeaderNodeId;
+        LOG.debug("Updated LeaderNodeID {}", updatedLeaderNodeID);
+      } else {
+        updatedLeaderNodeID = null;
+      }
+    }
+  }
+
+  /**
+   * Update the proxy index to the next proxy in the list.
+   * @return the new proxy index
+   */
+  private synchronized void nextProxyIndex() {
+    // round robin the next proxy
+    currentProxyIndex = (getCurrentProxyIndex() + 1) % scmProxyInfoMap.size();
+    currentProxySCMNodeId =  scmNodeIds.get(currentProxyIndex);
+  }
+
+  public RetryPolicy getRetryPolicy() {
+    // Client will attempt up to maxFailovers number of failovers between
+    // available SCMs before throwing exception.
+    RetryPolicy retryPolicy = new RetryPolicy() {
+      @Override
+      public RetryAction shouldRetry(Exception exception, int retries,
+          int failovers, boolean isIdempotentOrAtMostOnce)
+          throws Exception {
+
+        if (LOG.isDebugEnabled()) {
+          if (exception.getCause() != null) {
+            LOG.debug("RetryProxy: SCM Security Server {}: {}: {}",
+                getCurrentProxySCMNodeId(),
+                exception.getCause().getClass().getSimpleName(),
+                exception.getCause().getMessage());
+          } else {
+            LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(),
+                exception.getMessage());
+          }
+        }
+
+        if (SCMHAUtils.checkRetriableWithNoFailoverException(exception)) {
+          setUpdatedLeaderNodeID();
+        } else {
+          performFailoverToAssignedLeader(null, exception);
+        }
+        return SCMHAUtils
+            .getRetryAction(failovers, retries, exception, maxRetryCount,
+                getRetryInterval());
+      }
+    };
+
+    return retryPolicy;
+  }
+
+  public synchronized void setUpdatedLeaderNodeID() {
+    this.updatedLeaderNodeID = getCurrentProxySCMNodeId();
+  }
+
+  @Override
+  public Class<SCMSecretKeyProtocolOmPB> getInterface() {
+    return SCMSecretKeyProtocolOmPB.class;
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    for (Map.Entry<String, ProxyInfo<SCMSecretKeyProtocolService.BlockingInterface>> proxy :
+        scmProxies.entrySet()) {
+      if (proxy.getValue() != null) {
+        RPC.stopProxy(proxy.getValue());
+      }
+      scmProxies.remove(proxy.getKey());
+    }
+  }
+
+  public synchronized String getCurrentProxySCMNodeId() {
+    return currentProxySCMNodeId;
+  }
+
+  public synchronized int getCurrentProxyIndex() {
+    return currentProxyIndex;
+  }
+
+  private long getRetryInterval() {
+    return retryInterval;
+  }
+}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java
index d77fee778e..ca929c9916 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyClient.java
@@ -18,13 +18,13 @@
 package org.apache.hadoop.hdds.security.symmetric;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.protocol.SCMSecretKeyProtocol;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 
 import java.io.IOException;
 import java.util.UUID;
 
-import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecretKeyClient;
 
 /**
  * A composition of {@link DefaultSecretKeySignerClient} and
@@ -64,7 +64,7 @@ public class DefaultSecretKeyClient implements SecretKeyClient {
 
   public static SecretKeyClient create(ConfigurationSource conf)
       throws IOException {
-    SCMSecurityProtocol securityProtocol = getScmSecurityClient(conf);
+    SCMSecretKeyProtocol securityProtocol = getScmSecretKeyClient(conf);
     SecretKeySignerClient singerClient =
         new DefaultSecretKeySignerClient(securityProtocol);
     SecretKeyVerifierClient verifierClient =
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
index a1056f9139..c974d708ab 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeySignerClient.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.security.symmetric;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.protocol.SCMSecretKeyProtocol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Objects.requireNonNull;
-import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecretKeyClient;
 
 /**
  * Default implementation of {@link SecretKeySignerClient} that fetches
@@ -45,14 +45,14 @@ public class DefaultSecretKeySignerClient implements SecretKeySignerClient {
   private static final Logger LOG =
       LoggerFactory.getLogger(DefaultSecretKeySignerClient.class);
 
-  private final SCMSecurityProtocol scmSecurityProtocol;
+  private final SCMSecretKeyProtocol scmSecretKeyProtocol;
   private final AtomicReference<ManagedSecretKey> cache =
       new AtomicReference<>();
   private ScheduledExecutorService executorService;
 
   public DefaultSecretKeySignerClient(
-      SCMSecurityProtocol scmSecurityProtocol) {
-    this.scmSecurityProtocol = scmSecurityProtocol;
+      SCMSecretKeyProtocol scmSecretKeyProtocol) {
+    this.scmSecretKeyProtocol = scmSecretKeyProtocol;
   }
 
   @Override
@@ -64,7 +64,7 @@ public class DefaultSecretKeySignerClient implements SecretKeySignerClient {
   @Override
   public void start(ConfigurationSource conf) throws IOException {
     final ManagedSecretKey initialKey =
-        scmSecurityProtocol.getCurrentSecretKey();
+        scmSecretKeyProtocol.getCurrentSecretKey();
     LOG.info("Initial secret key fetched from SCM: {}.", initialKey);
     cache.set(initialKey);
     scheduleSecretKeyPoller(conf, initialKey.getCreationTime());
@@ -109,7 +109,7 @@ public class DefaultSecretKeySignerClient implements SecretKeySignerClient {
     // from SCM.
     if (nextRotate.isBefore(Instant.now())) {
       try {
-        ManagedSecretKey newKey = scmSecurityProtocol.getCurrentSecretKey();
+        ManagedSecretKey newKey = scmSecretKeyProtocol.getCurrentSecretKey();
         if (!newKey.equals(current)) {
           cache.set(newKey);
           LOG.info("New secret key fetched from SCM: {}.", newKey);
@@ -124,7 +124,7 @@ public class DefaultSecretKeySignerClient implements SecretKeySignerClient {
 
   public static DefaultSecretKeySignerClient create(ConfigurationSource conf)
       throws IOException {
-    SCMSecurityProtocol securityProtocol = getScmSecurityClient(conf);
-    return new DefaultSecretKeySignerClient(securityProtocol);
+    SCMSecretKeyProtocol secretKeyProtocol = getScmSecretKeyClient(conf);
+    return new DefaultSecretKeySignerClient(secretKeyProtocol);
   }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java
index 56478793cb..c6d5440ea4 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/DefaultSecretKeyVerifierClient.java
@@ -21,6 +21,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.SCMSecretKeyProtocol;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.slf4j.Logger;
@@ -34,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdds.security.symmetric.SecretKeyConfig.parseExpiryDuration;
 import static org.apache.hadoop.hdds.security.symmetric.SecretKeyConfig.parseRotateDuration;
+import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecretKeyClient;
 import static org.apache.hadoop.hdds.utils.HddsServerUtil.getScmSecurityClient;
 
 /**
@@ -46,7 +48,7 @@ public class DefaultSecretKeyVerifierClient implements SecretKeyVerifierClient {
 
   private final LoadingCache<UUID, ManagedSecretKey> cache;
 
-  DefaultSecretKeyVerifierClient(SCMSecurityProtocol scmSecurityProtocol,
+  DefaultSecretKeyVerifierClient(SCMSecretKeyProtocol scmSecretKeyProtocol,
                                  ConfigurationSource conf) {
     Duration expiryDuration = parseExpiryDuration(conf);
     Duration rotateDuration = parseRotateDuration(conf);
@@ -56,7 +58,7 @@ public class DefaultSecretKeyVerifierClient implements SecretKeyVerifierClient {
         new CacheLoader<UUID, ManagedSecretKey>() {
           @Override
           public ManagedSecretKey load(UUID id) throws Exception {
-            ManagedSecretKey secretKey = scmSecurityProtocol.getSecretKey(id);
+            ManagedSecretKey secretKey = scmSecretKeyProtocol.getSecretKey(id);
             LOG.info("Secret key fetched from SCM: {}", secretKey);
             return secretKey;
           }
@@ -93,7 +95,7 @@ public class DefaultSecretKeyVerifierClient implements SecretKeyVerifierClient {
 
   public static DefaultSecretKeyVerifierClient create(ConfigurationSource conf)
       throws IOException {
-    SCMSecurityProtocol securityProtocol = getScmSecurityClient(conf);
-    return new DefaultSecretKeyVerifierClient(securityProtocol, conf);
+    SCMSecretKeyProtocol secretKeyProtocol = getScmSecretKeyClient(conf);
+    return new DefaultSecretKeyVerifierClient(secretKeyProtocol, conf);
   }
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
index 78e4fc0b90..2ff44daf9b 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/symmetric/ManagedSecretKey.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hdds.security.symmetric;
 
 import com.google.protobuf.ByteString;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.ProtobufUtils;
 
@@ -129,8 +129,8 @@ public final class ManagedSecretKey {
   /**
    * @return the protobuf message to deserialize this object.
    */
-  public SCMSecurityProtocolProtos.ManagedSecretKey toProtobuf() {
-    return SCMSecurityProtocolProtos.ManagedSecretKey.newBuilder()
+  public SCMSecretKeyProtocolProtos.ManagedSecretKey toProtobuf() {
+    return SCMSecretKeyProtocolProtos.ManagedSecretKey.newBuilder()
         .setId(ProtobufUtils.toProtobuf(id))
         .setCreationTime(this.creationTime.toEpochMilli())
         .setExpiryTime(this.expiryTime.toEpochMilli())
@@ -143,7 +143,7 @@ public final class ManagedSecretKey {
    * Create a {@link ManagedSecretKey} from a given protobuf message.
    */
   public static ManagedSecretKey fromProtobuf(
-      SCMSecurityProtocolProtos.ManagedSecretKey message) {
+      SCMSecretKeyProtocolProtos.ManagedSecretKey message) {
     UUID id = ProtobufUtils.fromProtobuf(message.getId());
     Instant creationTime = Instant.ofEpochMilli(message.getCreationTime());
     Instant expiryTime = Instant.ofEpochMilli(message.getExpiryTime());
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 33d8c178c7..b907c1db3f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -43,12 +43,14 @@ import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.protocolPB.SCMSecretKeyProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.ratis.ServerNotLeaderException;
 import org.apache.hadoop.hdds.recon.ReconConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.hdds.scm.proxy.SCMClientConfig;
+import org.apache.hadoop.hdds.scm.proxy.SCMSecretKeyProtocolFailoverProxyProvider;
 import org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
@@ -504,6 +506,19 @@ public final class HddsServerUtil {
         SCMSecurityProtocol.class, conf);
   }
 
+  public static SCMSecretKeyProtocolClientSideTranslatorPB getScmSecretKeyClient(
+      ConfigurationSource conf) throws IOException {
+    return new SCMSecretKeyProtocolClientSideTranslatorPB(
+        new SCMSecretKeyProtocolFailoverProxyProvider(conf,
+            UserGroupInformation.getCurrentUser()));
+  }
+
+  public static SCMSecretKeyProtocolClientSideTranslatorPB getScmSecretKeyClient(
+      ConfigurationSource conf, UserGroupInformation ugi) {
+    return new SCMSecretKeyProtocolClientSideTranslatorPB(
+        new SCMSecretKeyProtocolFailoverProxyProvider(conf, ugi));
+  }
+
   /**
    * Initialize hadoop metrics system for Ozone servers.
    * @param configuration OzoneConfiguration to use.
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmSecretKeyProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmSecretKeyProtocol.proto
new file mode 100644
index 0000000000..88b00ff7c3
--- /dev/null
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmSecretKeyProtocol.proto
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+syntax = "proto2";
+
+option java_package = "org.apache.hadoop.hdds.protocol.proto";
+
+option java_outer_classname = "SCMSecretKeyProtocolProtos";
+
+option java_generic_services = true;
+
+option java_generate_equals_and_hash = true;
+
+package hadoop.hdds.security.symmetric;
+
+import "hdds.proto";
+
+/**
+All commands is send as request and all response come back via
+Response class. If adding new functions please follow this protocol, since
+our tracing and visibility tools depend on this pattern.
+*/
+message SCMSecretKeyRequest {
+    required Type cmdType = 1; // Type of the command
+
+    optional string traceID = 2;
+
+    optional SCMGetSecretKeyRequest getSecretKeyRequest = 3;
+}
+
+message SCMSecretKeyResponse {
+    required Type cmdType = 1; // Type of the command
+
+    // A string that identifies this command, we generate  Trace ID in Ozone
+    // frontend and this allows us to trace that command all over ozone.
+    optional string traceID = 2;
+
+    optional bool success = 3 [default = true];
+
+    optional string message = 4;
+
+    required Status status = 5;
+
+    optional SCMGetCurrentSecretKeyResponse currentSecretKeyResponseProto = 11;
+
+    optional SCMGetSecretKeyResponse getSecretKeyResponseProto = 12;
+
+    optional SCMSecretKeysListResponse secretKeysListResponseProto = 13;
+
+}
+
+enum Type {
+    GetCurrentSecretKey = 1;
+    GetSecretKey = 2;
+    GetAllSecretKeys = 3;
+}
+
+enum Status {
+    OK = 1;
+    INTERNAL_ERROR = 2;
+    SECRET_KEY_NOT_ENABLED = 3;
+    SECRET_KEY_NOT_INITIALIZED = 4;
+}
+
+service SCMSecretKeyProtocolService {
+    rpc submitRequest (SCMSecretKeyRequest) returns (SCMSecretKeyResponse);
+}
+
+message ManagedSecretKey {
+    required UUID id = 1;
+    required uint64 creationTime = 2;
+    required uint64 expiryTime = 3;
+    required string algorithm = 4;
+    required bytes encoded = 5;
+}
+
+message SCMGetSecretKeyRequest {
+    required UUID secretKeyId = 1;
+}
+
+message SCMGetCurrentSecretKeyResponse {
+    required ManagedSecretKey secretKey = 1;
+}
+
+message SCMGetSecretKeyResponse {
+    optional ManagedSecretKey secretKey = 1;
+}
+
+message SCMSecretKeysListResponse {
+    repeated ManagedSecretKey secretKeys = 1;
+}
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
index 27d1e3c1c3..1768444e07 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
@@ -56,7 +56,6 @@ message SCMSecurityRequest {
     optional SCMGetLatestCrlIdRequestProto getLatestCrlIdRequest = 11;
     optional SCMRevokeCertificatesRequestProto revokeCertificatesRequest = 12;
     optional SCMGetCertRequestProto getCertRequest = 13;
-    optional SCMGetSecretKeyRequest getSecretKeyRequest = 14;
 }
 
 message SCMSecurityResponse {
@@ -81,13 +80,6 @@ message SCMSecurityResponse {
     optional SCMGetLatestCrlIdResponseProto getLatestCrlIdResponseProto = 9;
 
     optional SCMRevokeCertificatesResponseProto revokeCertificatesResponseProto = 10;
-
-    optional SCMGetCurrentSecretKeyResponse currentSecretKeyResponseProto = 11;
-
-    optional SCMGetSecretKeyResponse getSecretKeyResponseProto = 12;
-
-    optional SCMSecretKeysListResponse secretKeysListResponseProto = 13;
-
 }
 
 enum Type {
@@ -103,9 +95,6 @@ enum Type {
     GetLatestCrlId = 10;
     RevokeCertificates = 11;
     GetCert = 12;
-    GetCurrentSecretKey = 13;
-    GetSecretKey = 14;
-    GetAllSecretKeys = 15;
 }
 
 enum Status {
@@ -126,8 +115,6 @@ enum Status {
     GET_ROOT_CA_CERTIFICATE_FAILED = 15;
     NOT_A_PRIMARY_SCM = 16;
     REVOKE_CERTIFICATE_FAILED = 17;
-    SECRET_KEY_NOT_ENABLED = 18;
-    SECRET_KEY_NOT_INITIALIZED = 19;
 }
 /**
 * This message is send by data node to prove its identity and get an SCM
@@ -260,31 +247,4 @@ message SCMRevokeCertificatesResponseProto {
 
 service SCMSecurityProtocolService {
     rpc submitRequest (SCMSecurityRequest) returns (SCMSecurityResponse);
-}
-
-message ManagedSecretKey {
-    required UUID id = 1;
-    required uint64 creationTime = 2;
-    required uint64 expiryTime = 3;
-    required string algorithm = 4;
-    required bytes encoded = 5;
-}
-
-message SCMGetSecretKeyRequest {
-    required UUID secretKeyId = 1;
-}
-
-message SCMGetCurrentSecretKeyResponse {
-    required ManagedSecretKey secretKey = 1;
-}
-
-message SCMGetSecretKeyResponse {
-    optional ManagedSecretKey secretKey = 1;
-}
-
-message SCMSecretKeysListResponse {
-    repeated ManagedSecretKey secretKeys = 1;
-}
-
-
-
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
index 384d818762..32705bb2a7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/io/ManagedSecretKeyCodec.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.ha.io;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos;
 import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 
 /**
@@ -37,8 +37,8 @@ public class ManagedSecretKeyCodec implements Codec {
   @Override
   public Object deserialize(Class<?> type, ByteString value)
       throws InvalidProtocolBufferException {
-    SCMSecurityProtocolProtos.ManagedSecretKey message =
-        SCMSecurityProtocolProtos.ManagedSecretKey.parseFrom(value);
+    SCMSecretKeyProtocolProtos.ManagedSecretKey message =
+        SCMSecretKeyProtocolProtos.ManagedSecretKey.parseFrom(value);
     return ManagedSecretKey.fromProtobuf(message);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecretKeyProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecretKeyProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000000..503e567ca7
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecretKeyProtocolServerSideTranslatorPB.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.protocol;
+
+import com.google.protobuf.ProtocolMessageEnum;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.protocol.SCMSecretKeyProtocol;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetCurrentSecretKeyResponse;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetSecretKeyRequest;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMGetSecretKeyResponse;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyRequest;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.SCMSecretKeyResponse;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos.Status;
+import org.apache.hadoop.hdds.protocolPB.SCMSecretKeyProtocolDatanodePB;
+import org.apache.hadoop.hdds.protocolPB.SCMSecretKeyProtocolOmPB;
+import org.apache.hadoop.hdds.scm.ha.RatisUtil;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.exception.SCMSecretKeyException;
+import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
+import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
+import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
+import org.apache.hadoop.util.ProtobufUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type.GetSCMCertificate;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link SCMSecretKeyProtocolDatanodePB} to the server implementation.
+ */
+public class SCMSecretKeyProtocolServerSideTranslatorPB
+    implements SCMSecretKeyProtocolDatanodePB, SCMSecretKeyProtocolOmPB {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMSecretKeyProtocolServerSideTranslatorPB.class);
+
+  private final SCMSecretKeyProtocol impl;
+  private final StorageContainerManager scm;
+
+  private OzoneProtocolMessageDispatcher<SCMSecretKeyRequest,
+      SCMSecretKeyResponse, ProtocolMessageEnum> dispatcher;
+
+  public SCMSecretKeyProtocolServerSideTranslatorPB(SCMSecretKeyProtocol impl,
+      StorageContainerManager storageContainerManager,
+      ProtocolMessageMetrics messageMetrics) {
+    this.impl = impl;
+    this.scm = storageContainerManager;
+    this.dispatcher =
+        new OzoneProtocolMessageDispatcher<>("SCMSecretKeyProtocol",
+            messageMetrics, LOG);
+  }
+
+  @Override
+  public SCMSecretKeyResponse submitRequest(RpcController controller,
+      SCMSecretKeyRequest request) throws ServiceException {
+    if (!scm.checkLeader()) {
+      RatisUtil.checkRatisException(
+          scm.getScmHAManager().getRatisServer().triggerNotLeaderException(),
+          scm.getSecurityProtocolRpcPort(), scm.getScmId());
+    }
+    return dispatcher.processRequest(request, this::processRequest,
+        request.getCmdType(), request.getTraceID());
+  }
+
+  public SCMSecretKeyResponse processRequest(SCMSecretKeyRequest request)
+      throws ServiceException {
+    SCMSecretKeyResponse.Builder scmSecurityResponse =
+        SCMSecretKeyResponse.newBuilder().setCmdType(request.getCmdType())
+            .setStatus(Status.OK);
+    try {
+      switch (request.getCmdType()) {
+      case GetCurrentSecretKey:
+        return scmSecurityResponse
+            .setCurrentSecretKeyResponseProto(getCurrentSecretKey())
+            .build();
+
+      case GetSecretKey:
+        return scmSecurityResponse.setGetSecretKeyResponseProto(
+                getSecretKey(request.getGetSecretKeyRequest()))
+            .build();
+
+      case GetAllSecretKeys:
+        return scmSecurityResponse
+            .setSecretKeysListResponseProto(getAllSecretKeys())
+            .build();
+
+      default:
+        throw new IllegalArgumentException(
+            "Unknown request type: " + request.getCmdType());
+      }
+    } catch (IOException e) {
+      RatisUtil.checkRatisException(e, scm.getSecurityProtocolRpcPort(),
+          scm.getScmId());
+      scmSecurityResponse.setSuccess(false);
+      scmSecurityResponse.setStatus(exceptionToResponseStatus(e));
+      // If actual cause is set in SCMSecurityException, set message with
+      // actual cause message.
+      if (e.getMessage() != null) {
+        scmSecurityResponse.setMessage(e.getMessage());
+      } else {
+        if (e.getCause() != null && e.getCause().getMessage() != null) {
+          scmSecurityResponse.setMessage(e.getCause().getMessage());
+        }
+      }
+      return scmSecurityResponse.build();
+    }
+  }
+
+  private SCMSecretKeyProtocolProtos.SCMSecretKeysListResponse getAllSecretKeys()
+      throws IOException {
+    SCMSecretKeyProtocolProtos.SCMSecretKeysListResponse.Builder builder =
+        SCMSecretKeyProtocolProtos.SCMSecretKeysListResponse.newBuilder();
+    impl.getAllSecretKeys()
+        .stream().map(ManagedSecretKey::toProtobuf)
+        .forEach(builder::addSecretKeys);
+    return builder.build();
+  }
+
+  private SCMGetSecretKeyResponse getSecretKey(
+      SCMGetSecretKeyRequest getSecretKeyRequest) throws IOException {
+    SCMGetSecretKeyResponse.Builder builder =
+        SCMGetSecretKeyResponse.newBuilder();
+    UUID id = ProtobufUtils.fromProtobuf(getSecretKeyRequest.getSecretKeyId());
+    ManagedSecretKey secretKey = impl.getSecretKey(id);
+    if (secretKey != null) {
+      builder.setSecretKey(secretKey.toProtobuf());
+    }
+    return builder.build();
+  }
+
+  private SCMGetCurrentSecretKeyResponse getCurrentSecretKey()
+      throws IOException {
+    return SCMGetCurrentSecretKeyResponse.newBuilder()
+        .setSecretKey(impl.getCurrentSecretKey().toProtobuf())
+        .build();
+  }
+
+  private Status exceptionToResponseStatus(IOException ex) {
+    if (ex instanceof SCMSecretKeyException) {
+      return Status.values()[
+          ((SCMSecretKeyException) ex).getErrorCode().ordinal()];
+    } else {
+      return Status.INTERNAL_ERROR;
+    }
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
index 77529f54e3..056247bfba 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
@@ -27,20 +27,16 @@ import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCer
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertificateRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCrlsResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCurrentSecretKeyResponse;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetDataNodeCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetLatestCrlIdResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetOMCertRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSCMCertRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyRequest;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMGetSecretKeyResponse;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMListCertificateResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMRevokeCertificatesRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMRevokeCertificatesResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecretKeysListResponse;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Status;
@@ -162,21 +158,6 @@ public class SCMSecurityProtocolServerSideTranslatorPB
             getCertificate(request.getGetCertRequest()))
             .build();
 
-      case GetCurrentSecretKey:
-        return scmSecurityResponse
-            .setCurrentSecretKeyResponseProto(getCurrentSecretKey())
-            .build();
-
-      case GetSecretKey:
-        return scmSecurityResponse.setGetSecretKeyResponseProto(
-                getSecretKey(request.getGetSecretKeyRequest()))
-            .build();
-
-      case GetAllSecretKeys:
-        return scmSecurityResponse
-            .setSecretKeysListResponseProto(getAllSecretKeys())
-            .build();
-
       default:
         throw new IllegalArgumentException(
             "Unknown request type: " + request.getCmdType());
@@ -199,34 +180,6 @@ public class SCMSecurityProtocolServerSideTranslatorPB
     }
   }
 
-  private SCMSecretKeysListResponse getAllSecretKeys() throws IOException {
-    SCMSecretKeysListResponse.Builder builder =
-        SCMSecretKeysListResponse.newBuilder();
-    impl.getAllSecretKeys()
-        .stream().map(ManagedSecretKey::toProtobuf)
-        .forEach(builder::addSecretKeys);
-    return builder.build();
-  }
-
-  private SCMGetSecretKeyResponse getSecretKey(
-      SCMGetSecretKeyRequest getSecretKeyRequest) throws IOException {
-    SCMGetSecretKeyResponse.Builder builder =
-        SCMGetSecretKeyResponse.newBuilder();
-    UUID id = ProtobufUtils.fromProtobuf(getSecretKeyRequest.getSecretKeyId());
-    ManagedSecretKey secretKey = impl.getSecretKey(id);
-    if (secretKey != null) {
-      builder.setSecretKey(secretKey.toProtobuf());
-    }
-    return builder.build();
-  }
-
-  private SCMGetCurrentSecretKeyResponse getCurrentSecretKey()
-      throws IOException {
-    return SCMGetCurrentSecretKeyResponse.newBuilder()
-        .setSecretKey(impl.getCurrentSecretKey().toProtobuf())
-        .build();
-  }
-
   /**
    * Convert exception to corresponsing status.
    * @param ex
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index e3fd7d416b..d024816722 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -38,21 +38,26 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.SCMSecretKeyProtocol;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.OzoneManagerDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmNodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto.SCMSecretKeyProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos;
+import org.apache.hadoop.hdds.protocolPB.SCMSecretKeyProtocolDatanodePB;
+import org.apache.hadoop.hdds.protocolPB.SCMSecretKeyProtocolOmPB;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.protocol.SCMSecretKeyProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.update.server.SCMUpdateServiceGrpcServer;
 import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
 import org.apache.hadoop.hdds.scm.update.server.SCMCRLStore;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.protocol.SCMSecurityProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdds.security.exception.SCMSecretKeyException;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode;
 import org.apache.hadoop.hdds.security.symmetric.ManagedSecretKey;
 import org.apache.hadoop.hdds.security.symmetric.SecretKeyManager;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
@@ -75,6 +80,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import static org.apache.hadoop.hdds.security.exception.SCMSecretKeyException.ErrorCode.SECRET_KEY_NOT_ENABLED;
+import static org.apache.hadoop.hdds.security.exception.SCMSecretKeyException.ErrorCode.SECRET_KEY_NOT_INITIALIZED;
 import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.CERTIFICATE_NOT_FOUND;
 import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CA_CERT_FAILED;
 import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CERTIFICATE_FAILED;
@@ -87,7 +94,8 @@ import static org.apache.hadoop.hdds.security.x509.certificate.authority.Certifi
 @KerberosInfo(
     serverPrincipal = ScmConfig.ConfigStrings.HDDS_SCM_KERBEROS_PRINCIPAL_KEY)
 @InterfaceAudience.Private
-public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
+public class SCMSecurityProtocolServer implements SCMSecurityProtocol,
+    SCMSecretKeyProtocol {
 
   private static final Logger LOGGER = LoggerFactory
       .getLogger(SCMSecurityProtocolServer.class);
@@ -131,6 +139,10 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
             .newReflectiveBlockingService(
                 new SCMSecurityProtocolServerSideTranslatorPB(this,
                     scm, metrics));
+//    BlockingService secretKeyService =
+//        SCMSecretKeyProtocolProtos.SCMSecretKeyProtocolService.newReflectiveBlockingService(
+//            new SCMSecretKeyProtocolServerSideTranslatorPB(this, scm, metrics)
+//        );
     this.rpcServer =
         StorageContainerManager.startRpcServer(
             conf,
@@ -138,6 +150,10 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
             SCMSecurityProtocolPB.class,
             secureProtoPbService,
             handlerCount);
+//    HddsServerUtil.addPBProtocol(conf, SCMSecretKeyProtocolDatanodePB.class,
+//        secretKeyService, rpcServer);
+//    HddsServerUtil.addPBProtocol(conf, SCMSecretKeyProtocolOmPB.class,
+//        secretKeyService, rpcServer);
     if (conf.getBoolean(
         CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
       rpcServer.refreshServiceAcl(conf, SCMPolicyProvider.getInstance());
@@ -177,33 +193,33 @@ public class SCMSecurityProtocolServer implements SCMSecurityProtocol {
   }
 
   @Override
-  public ManagedSecretKey getCurrentSecretKey() throws SCMSecurityException {
+  public ManagedSecretKey getCurrentSecretKey() throws SCMSecretKeyException {
     validateSecretKeyStatus();
     return secretKeyManager.getCurrentSecretKey();
   }
 
   @Override
-  public ManagedSecretKey getSecretKey(UUID id) throws SCMSecurityException {
+  public ManagedSecretKey getSecretKey(UUID id) throws SCMSecretKeyException {
     validateSecretKeyStatus();
     return secretKeyManager.getSecretKey(id);
   }
 
   @Override
-  public List<ManagedSecretKey> getAllSecretKeys() throws SCMSecurityException {
+  public List<ManagedSecretKey> getAllSecretKeys() throws SCMSecretKeyException {
     validateSecretKeyStatus();
     return secretKeyManager.getSortedKeys();
   }
 
-  private void validateSecretKeyStatus() throws SCMSecurityException {
+  private void validateSecretKeyStatus() throws SCMSecretKeyException {
     if (secretKeyManager == null) {
-      throw new SCMSecurityException("Secret keys are not enabled.",
-          ErrorCode.SECRET_KEY_NOT_ENABLED);
+      throw new SCMSecretKeyException("Secret keys are not enabled.",
+          SECRET_KEY_NOT_ENABLED);
     }
 
     if (!secretKeyManager.isInitialized()) {
-      throw new SCMSecurityException(
+      throw new SCMSecretKeyException(
           "Secret key initialization is not finished yet.",
-          ErrorCode.SECRET_KEY_NOT_INITIALIZED);
+          SECRET_KEY_NOT_INITIALIZED);
     }
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
index 84423cabac..85d086eec1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecretKeysApi.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.conf.DefaultConfigManager;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.SCMSecretKeyProtocol;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.scm.ScmConfig;
 import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
@@ -192,20 +193,20 @@ public final class TestSecretKeysApi {
     conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "3000ms");
 
     startCluster();
-    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
+    SCMSecretKeyProtocol secretKeyProtocol = getScmSecurityProtocol();
 
     // start the test when keys are full.
     GenericTestUtils.waitFor(() -> {
       try {
-        return securityProtocol.getAllSecretKeys().size() >= 3;
+        return secretKeyProtocol.getAllSecretKeys().size() >= 3;
       } catch (IOException ex) {
         throw new RuntimeException(ex);
       }
     }, 100, 4_000);
 
-    ManagedSecretKey initialKey = securityProtocol.getCurrentSecretKey();
+    ManagedSecretKey initialKey = secretKeyProtocol.getCurrentSecretKey();
     assertNotNull(initialKey);
-    List<ManagedSecretKey> initialKeys = securityProtocol.getAllSecretKeys();
+    List<ManagedSecretKey> initialKeys = secretKeyProtocol.getAllSecretKeys();
     assertEquals(initialKey, initialKeys.get(0));
     ManagedSecretKey lastKey = initialKeys.get(initialKeys.size() - 1);
 
@@ -215,14 +216,14 @@ public final class TestSecretKeysApi {
     // wait for the next rotation.
     GenericTestUtils.waitFor(() -> {
       try {
-        ManagedSecretKey newCurrentKey = securityProtocol.getCurrentSecretKey();
+        ManagedSecretKey newCurrentKey = secretKeyProtocol.getCurrentSecretKey();
         return !newCurrentKey.equals(initialKey);
       } catch (IOException ex) {
         throw new RuntimeException(ex);
       }
     }, 100, 1500);
-    ManagedSecretKey  updatedKey = securityProtocol.getCurrentSecretKey();
-    List<ManagedSecretKey>  updatedKeys = securityProtocol.getAllSecretKeys();
+    ManagedSecretKey  updatedKey = secretKeyProtocol.getCurrentSecretKey();
+    List<ManagedSecretKey>  updatedKeys = secretKeyProtocol.getAllSecretKeys();
 
     LOG.info("Updated active key: {}", updatedKey);
     LOG.info("Updated keys: {}", updatedKeys);
@@ -234,10 +235,10 @@ public final class TestSecretKeysApi {
     assertFalse(updatedKeys.contains(lastKey));
 
     // assert getSecretKey by ID.
-    ManagedSecretKey keyById = securityProtocol.getSecretKey(
+    ManagedSecretKey keyById = secretKeyProtocol.getSecretKey(
         updatedKey.getId());
     assertNotNull(keyById);
-    ManagedSecretKey nonExisting = securityProtocol.getSecretKey(
+    ManagedSecretKey nonExisting = secretKeyProtocol.getSecretKey(
         UUID.randomUUID());
     assertNull(nonExisting);
   }
@@ -248,18 +249,18 @@ public final class TestSecretKeysApi {
   @Test
   public void testSecretKeyApiNotEnabled() throws Exception {
     startCluster();
-    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
+    SCMSecretKeyProtocol secretKeyProtocol = getScmSecurityProtocol();
 
     SCMSecurityException ex = assertThrows(SCMSecurityException.class,
-            securityProtocol::getCurrentSecretKey);
+            secretKeyProtocol::getCurrentSecretKey);
     assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
 
     ex = assertThrows(SCMSecurityException.class,
-        () -> securityProtocol.getSecretKey(UUID.randomUUID()));
+        () -> secretKeyProtocol.getSecretKey(UUID.randomUUID()));
     assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
 
     ex = assertThrows(SCMSecurityException.class,
-        securityProtocol::getAllSecretKeys);
+        secretKeyProtocol::getAllSecretKeys);
     assertEquals(SECRET_KEY_NOT_ENABLED, ex.getErrorCode());
   }
 
@@ -276,7 +277,7 @@ public final class TestSecretKeysApi {
     conf.set(HDDS_SECRET_KEY_EXPIRY_DURATION, "7d");
 
     startCluster();
-    SCMSecurityProtocol securityProtocol = getScmSecurityProtocol();
+    SCMSecretKeyProtocol securityProtocol = getScmSecurityProtocol();
     List<ManagedSecretKey> keysInitial = securityProtocol.getAllSecretKeys();
     LOG.info("Keys before fail over: {}.", keysInitial);
 
@@ -311,13 +312,13 @@ public final class TestSecretKeysApi {
   }
 
   @NotNull
-  private SCMSecurityProtocol getScmSecurityProtocol() throws IOException {
+  private SCMSecretKeyProtocol getScmSecurityProtocol() throws IOException {
     UserGroupInformation ugi =
         UserGroupInformation.loginUserFromKeytabAndReturnUGI(
             testUserPrincipal, testUserKeytab.getCanonicalPath());
     ugi.setAuthenticationMethod(KERBEROS);
-    SCMSecurityProtocol scmSecurityProtocolClient =
-        HddsServerUtil.getScmSecurityClient(conf, ugi);
+    SCMSecretKeyProtocol scmSecurityProtocolClient =
+        HddsServerUtil.getScmSecretKeyClient(conf, ugi);
     assertNotNull(scmSecurityProtocolClient);
     return scmSecurityProtocolClient;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org