You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2020/04/03 11:35:05 UTC

[hadoop-ozone] branch master updated: HDDS-3239. Provide message-level metrics from the generic protocol dispatch

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ceb514  HDDS-3239. Provide message-level metrics from the generic protocol dispatch
1ceb514 is described below

commit 1ceb51477e0bb7f44204ba8ea98d7e409277a748
Author: Elek Márton <el...@apache.org>
AuthorDate: Fri Apr 3 13:30:59 2020 +0200

    HDDS-3239. Provide message-level metrics from the generic protocol dispatch
    
    Closes #708
---
 .../container/common/impl/HddsDispatcher.java      |  75 ++++++-------
 .../ozone/container/keyvalue/KeyValueHandler.java  |  98 ++++++++---------
 ...inerDatanodeProtocolServerSideTranslatorPB.java |   3 +-
 .../container/keyvalue/TestKeyValueHandler.java    | 116 ++++++++++++---------
 .../server/OzoneProtocolMessageDispatcher.java     |  11 +-
 .../hadoop/hdds/utils/ProtocolMessageMetrics.java  |  42 +++++---
 ...lockLocationProtocolServerSideTranslatorPB.java |   4 +-
 ...inerLocationProtocolServerSideTranslatorPB.java |   4 +-
 .../hdds/scm/server/SCMBlockProtocolServer.java    |   6 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  75 ++++++-------
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  21 ++--
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  56 +++++-----
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   4 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  13 +--
 .../recon/scm/ReconDatanodeProtocolServer.java     |   4 +-
 15 files changed, 285 insertions(+), 247 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index cef1c8f..0ea8189 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -18,23 +18,28 @@
 
 package org.apache.hadoop.ozone.container.common.impl;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerDataProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .ContainerNotOpenException;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .InvalidContainerStateException;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.InvalidContainerStateException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.audit.AuditAction;
 import org.apache.hadoop.ozone.audit.AuditEventStatus;
 import org.apache.hadoop.ozone.audit.AuditLogger;
@@ -42,40 +47,25 @@ import org.apache.hadoop.ozone.audit.AuditLoggerType;
 import org.apache.hadoop.ozone.audit.AuditMarker;
 import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.audit.Auditor;
-import org.apache.hadoop.ozone.container.common.helpers
-    .ContainerCommandRequestPBHelper;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerCommandRequestPBHelper;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis
-    .DispatcherContext;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    ContainerDataProto.State;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-
-import io.opentracing.Scope;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import io.opentracing.Scope;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
+import org.apache.ratis.thirdparty.com.google.protobuf.ProtocolMessageEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Ozone Container dispatcher takes a call from the netty server and routes it
@@ -92,6 +82,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
   private final VolumeSet volumeSet;
   private final StateContext context;
   private final float containerCloseThreshold;
+  private final ProtocolMessageMetrics<ProtocolMessageEnum> protocolMetrics;
   private String scmID;
   private ContainerMetrics metrics;
   private final TokenVerifier tokenVerifier;
@@ -118,14 +109,22 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
         HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED,
         HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT);
     this.tokenVerifier = tokenVerifier;
+
+    protocolMetrics =
+        new ProtocolMessageMetrics<ProtocolMessageEnum>(
+            "HddsDispatcher",
+            "HDDS dispatcher metrics",
+            ContainerProtos.Type.values());
   }
 
   @Override
   public void init() {
+    protocolMetrics.register();
   }
 
   @Override
   public void shutdown() {
+    protocolMetrics.unregister();
   }
 
   /**
@@ -157,9 +156,13 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
   public ContainerCommandResponseProto dispatch(
       ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) {
     String spanName = "HddsDispatcher." + msg.getCmdType().name();
+    long startTime = System.nanoTime();
     try (Scope scope = TracingUtil
         .importAndCreateScope(spanName, msg.getTraceID())) {
       return dispatchRequest(msg, dispatcherContext);
+    } finally {
+      protocolMetrics
+          .increment(msg.getCmdType(), System.nanoTime() - startTime);
     }
   }
 
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index a329bdb..15177fc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -34,26 +34,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerDataProto.State;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .GetSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.GetSmallFileRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .PutSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutSmallFileRequestProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.ozone.common.ChunkBuffer;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
@@ -65,28 +58,29 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis
-    .DispatcherContext;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis
-    .DispatcherContext.WriteChunkStage;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume
-    .RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory;
 import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory;
 import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
-    Result.*;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CLOSED_CONTAINER_IO;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockResponseSuccess;
@@ -98,7 +92,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess;
 import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
-
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -154,45 +148,55 @@ public class KeyValueHandler extends Handler {
       ContainerCommandRequestProto request, Container container,
       DispatcherContext dispatcherContext) {
 
+    return KeyValueHandler
+        .dispatchRequest(this, request, (KeyValueContainer) container,
+            dispatcherContext);
+  }
+
+  @VisibleForTesting
+  static ContainerCommandResponseProto dispatchRequest(KeyValueHandler handler,
+      ContainerCommandRequestProto request, KeyValueContainer kvContainer,
+      DispatcherContext dispatcherContext) {
     Type cmdType = request.getCmdType();
-    KeyValueContainer kvContainer = (KeyValueContainer) container;
+
     switch(cmdType) {
     case CreateContainer:
-      return handleCreateContainer(request, kvContainer);
+      return handler.handleCreateContainer(request, kvContainer);
     case ReadContainer:
-      return handleReadContainer(request, kvContainer);
+      return handler.handleReadContainer(request, kvContainer);
     case UpdateContainer:
-      return handleUpdateContainer(request, kvContainer);
+      return handler.handleUpdateContainer(request, kvContainer);
     case DeleteContainer:
-      return handleDeleteContainer(request, kvContainer);
+      return handler.handleDeleteContainer(request, kvContainer);
     case ListContainer:
-      return handleUnsupportedOp(request);
+      return handler.handleUnsupportedOp(request);
     case CloseContainer:
-      return handleCloseContainer(request, kvContainer);
+      return handler.handleCloseContainer(request, kvContainer);
     case PutBlock:
-      return handlePutBlock(request, kvContainer, dispatcherContext);
+      return handler.handlePutBlock(request, kvContainer, dispatcherContext);
     case GetBlock:
-      return handleGetBlock(request, kvContainer);
+      return handler.handleGetBlock(request, kvContainer);
     case DeleteBlock:
-      return handleDeleteBlock(request, kvContainer);
+      return handler.handleDeleteBlock(request, kvContainer);
     case ListBlock:
-      return handleUnsupportedOp(request);
+      return handler.handleUnsupportedOp(request);
     case ReadChunk:
-      return handleReadChunk(request, kvContainer, dispatcherContext);
+      return handler.handleReadChunk(request, kvContainer, dispatcherContext);
     case DeleteChunk:
-      return handleDeleteChunk(request, kvContainer);
+      return handler.handleDeleteChunk(request, kvContainer);
     case WriteChunk:
-      return handleWriteChunk(request, kvContainer, dispatcherContext);
+      return handler.handleWriteChunk(request, kvContainer, dispatcherContext);
     case ListChunk:
-      return handleUnsupportedOp(request);
+      return handler.handleUnsupportedOp(request);
     case CompactChunk:
-      return handleUnsupportedOp(request);
+      return handler.handleUnsupportedOp(request);
     case PutSmallFile:
-      return handlePutSmallFile(request, kvContainer, dispatcherContext);
+      return handler
+          .handlePutSmallFile(request, kvContainer, dispatcherContext);
     case GetSmallFile:
-      return handleGetSmallFile(request, kvContainer);
+      return handler.handleGetSmallFile(request, kvContainer);
     case GetCommittedBlockLength:
-      return handleGetCommittedBlockLength(request, kvContainer);
+      return handler.handleGetCommittedBlockLength(request, kvContainer);
     default:
       return null;
     }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index aa28f81..e99cbae 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 
+import com.google.protobuf.ProtocolMessageEnum;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.slf4j.Logger;
@@ -53,7 +54,7 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
 
   public StorageContainerDatanodeProtocolServerSideTranslatorPB(
       StorageContainerDatanodeProtocol impl,
-      ProtocolMessageMetrics protocolMessageMetrics) {
+      ProtocolMessageMetrics<ProtocolMessageEnum> protocolMessageMetrics) {
     this.impl = impl;
     dispatcher =
         new OzoneProtocolMessageDispatcher<>("SCMDatanodeProtocol",
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index d9e7f09..1f5c677 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.ozone.container.keyvalue;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
@@ -25,44 +29,38 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.scm.container.common.helpers
-    .StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
 import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
 import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static org.junit.Assert.assertEquals;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestRule;
 import org.junit.rules.Timeout;
-
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.mockito.Mockito;
-
-import static org.apache.hadoop.hdds.HddsConfigKeys
-    .HDDS_DATANODE_VOLUME_CHOOSING_POLICY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyLong;
+import org.mockito.Mockito;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.times;
 
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.UUID;
-
 /**
  * Unit tests for {@link KeyValueHandler}.
  */
@@ -94,28 +92,27 @@ public class TestKeyValueHandler {
   public void setup() throws StorageContainerException {
     // Create mock HddsDispatcher and KeyValueHandler.
     handler = Mockito.mock(KeyValueHandler.class);
-    dispatcher = Mockito.mock(HddsDispatcher.class);
-    Mockito.when(dispatcher.getHandler(any())).thenReturn(handler);
-    Mockito.when(dispatcher.dispatch(any(), any())).thenCallRealMethod();
-    Mockito.when(dispatcher.getContainer(anyLong())).thenReturn(
-        Mockito.mock(KeyValueContainer.class));
-    Mockito.when(dispatcher.getMissingContainerSet())
-        .thenReturn(new HashSet<>());
-    Mockito.when(handler.handle(any(), any(), any())).thenCallRealMethod();
-    doCallRealMethod().when(dispatcher).setMetricsForTesting(any());
-    dispatcher.setMetricsForTesting(Mockito.mock(ContainerMetrics.class));
-    Mockito.when(dispatcher.buildAuditMessageForFailure(any(), any(), any()))
-        .thenCallRealMethod();
-    Mockito.when(dispatcher.buildAuditMessageForSuccess(any(), any()))
-        .thenCallRealMethod();
+
+    HashMap<ContainerType, Handler> handlers = new HashMap<>();
+    handlers.put(ContainerType.KeyValueContainer, handler);
+
+    dispatcher = new HddsDispatcher(
+        new OzoneConfiguration(),
+        Mockito.mock(ContainerSet.class),
+        Mockito.mock(VolumeSet.class),
+        handlers,
+        Mockito.mock(StateContext.class),
+        Mockito.mock(ContainerMetrics.class),
+        Mockito.mock(TokenVerifier.class)
+    );
+
   }
 
   /**
    * Test that Handler handles different command types correctly.
    */
-  @Test
-  public void testHandlerCommandHandling() {
-
+  public void testHandlerCommandHandling() throws Exception {
+    Mockito.reset(handler);
     // Test Create Container Request handling
     ContainerCommandRequestProto createContainerRequest =
         ContainerProtos.ContainerCommandRequestProto.newBuilder()
@@ -125,113 +122,132 @@ public class TestKeyValueHandler {
             .setCreateContainer(ContainerProtos.CreateContainerRequestProto
                 .getDefaultInstance())
             .build();
+
+    KeyValueContainer container = Mockito.mock(KeyValueContainer.class);
+
     DispatcherContext context = new DispatcherContext.Builder().build();
-    dispatcher.dispatch(createContainerRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, createContainerRequest, container, context);
     Mockito.verify(handler, times(1)).handleCreateContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Read Container Request handling
     ContainerCommandRequestProto readContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ReadContainer);
-    dispatcher.dispatch(readContainerRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, readContainerRequest, container, context);
     Mockito.verify(handler, times(1)).handleReadContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Update Container Request handling
     ContainerCommandRequestProto updateContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.UpdateContainer);
-    dispatcher.dispatch(updateContainerRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, updateContainerRequest, container, context);
     Mockito.verify(handler, times(1)).handleUpdateContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Delete Container Request handling
     ContainerCommandRequestProto deleteContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.DeleteContainer);
-    dispatcher.dispatch(deleteContainerRequest, null);
+    KeyValueHandler
+        .dispatchRequest(handler, deleteContainerRequest, container, context);
     Mockito.verify(handler, times(1)).handleDeleteContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test List Container Request handling
     ContainerCommandRequestProto listContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ListContainer);
-    dispatcher.dispatch(listContainerRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, listContainerRequest, container, context);
     Mockito.verify(handler, times(1)).handleUnsupportedOp(
         any(ContainerCommandRequestProto.class));
 
     // Test Close Container Request handling
     ContainerCommandRequestProto closeContainerRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.CloseContainer);
-    dispatcher.dispatch(closeContainerRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, closeContainerRequest, container, context);
     Mockito.verify(handler, times(1)).handleCloseContainer(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Put Block Request handling
     ContainerCommandRequestProto putBlockRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.PutBlock);
-    dispatcher.dispatch(putBlockRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, putBlockRequest, container, context);
     Mockito.verify(handler, times(1)).handlePutBlock(
         any(ContainerCommandRequestProto.class), any(), any());
 
     // Test Get Block Request handling
     ContainerCommandRequestProto getBlockRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.GetBlock);
-    dispatcher.dispatch(getBlockRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, getBlockRequest, container, context);
     Mockito.verify(handler, times(1)).handleGetBlock(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Delete Block Request handling
     ContainerCommandRequestProto deleteBlockRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.DeleteBlock);
-    dispatcher.dispatch(deleteBlockRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, deleteBlockRequest, container, context);
     Mockito.verify(handler, times(1)).handleDeleteBlock(
         any(ContainerCommandRequestProto.class), any());
 
     // Test List Block Request handling
     ContainerCommandRequestProto listBlockRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ListBlock);
-    dispatcher.dispatch(listBlockRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, listBlockRequest, container, context);
     Mockito.verify(handler, times(2)).handleUnsupportedOp(
         any(ContainerCommandRequestProto.class));
 
     // Test Read Chunk Request handling
     ContainerCommandRequestProto readChunkRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ReadChunk);
-    dispatcher.dispatch(readChunkRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, readChunkRequest, container, context);
     Mockito.verify(handler, times(1)).handleReadChunk(
         any(ContainerCommandRequestProto.class), any(), any());
 
     // Test Delete Chunk Request handling
     ContainerCommandRequestProto deleteChunkRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.DeleteChunk);
-    dispatcher.dispatch(deleteChunkRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, deleteChunkRequest, container, context);
     Mockito.verify(handler, times(1)).handleDeleteChunk(
         any(ContainerCommandRequestProto.class), any());
 
     // Test Write Chunk Request handling
     ContainerCommandRequestProto writeChunkRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.WriteChunk);
-    dispatcher.dispatch(writeChunkRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, writeChunkRequest, container, context);
     Mockito.verify(handler, times(1)).handleWriteChunk(
         any(ContainerCommandRequestProto.class), any(), any());
 
     // Test List Chunk Request handling
     ContainerCommandRequestProto listChunkRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.ListChunk);
-    dispatcher.dispatch(listChunkRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, listChunkRequest, container, context);
     Mockito.verify(handler, times(3)).handleUnsupportedOp(
         any(ContainerCommandRequestProto.class));
 
     // Test Put Small File Request handling
     ContainerCommandRequestProto putSmallFileRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.PutSmallFile);
-    dispatcher.dispatch(putSmallFileRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, putSmallFileRequest, container, context);
     Mockito.verify(handler, times(1)).handlePutSmallFile(
         any(ContainerCommandRequestProto.class), any(), any());
 
     // Test Get Small File Request handling
     ContainerCommandRequestProto getSmallFileRequest =
         getDummyCommandRequestProto(ContainerProtos.Type.GetSmallFile);
-    dispatcher.dispatch(getSmallFileRequest, context);
+    KeyValueHandler
+        .dispatchRequest(handler, getSmallFileRequest, container, context);
     Mockito.verify(handler, times(1)).handleGetSmallFile(
         any(ContainerCommandRequestProto.class), any());
   }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
index fb13fe2..d52fbda 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
@@ -38,12 +38,14 @@ public class OzoneProtocolMessageDispatcher<REQUEST, RESPONSE> {
 
   private String serviceName;
 
-  private final ProtocolMessageMetrics protocolMessageMetrics;
+  private final ProtocolMessageMetrics<ProtocolMessageEnum>
+      protocolMessageMetrics;
 
   private Logger logger;
 
   public OzoneProtocolMessageDispatcher(String serviceName,
-      ProtocolMessageMetrics protocolMessageMetrics, Logger logger) {
+      ProtocolMessageMetrics<ProtocolMessageEnum> protocolMessageMetrics,
+      Logger logger) {
     this.serviceName = serviceName;
     this.protocolMessageMetrics = protocolMessageMetrics;
     this.logger = logger;
@@ -67,10 +69,13 @@ public class OzoneProtocolMessageDispatcher<REQUEST, RESPONSE> {
         logger.debug("{} {} request is received",
             serviceName, type.toString());
       }
-      protocolMessageMetrics.increment(type);
+
+      long startTime = System.nanoTime();
 
       RESPONSE response = methodCall.apply(request);
 
+      protocolMessageMetrics.increment(type, System.nanoTime() - startTime);
+
       if (logger.isTraceEnabled()) {
         logger.trace(
             "[service={}] [type={}] request is processed. Response: "
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/ProtocolMessageMetrics.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/ProtocolMessageMetrics.java
index 73c3f01..aa50eab 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/ProtocolMessageMetrics.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/ProtocolMessageMetrics.java
@@ -25,41 +25,43 @@ import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsInfo;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-
-import com.google.protobuf.ProtocolMessageEnum;
+import org.apache.hadoop.metrics2.lib.Interns;
 
 /**
  * Metrics to count all the subtypes of a specific message.
  */
-public class ProtocolMessageMetrics implements MetricsSource {
+public class ProtocolMessageMetrics<KEY> implements MetricsSource {
 
   private String name;
 
   private String description;
 
-  private Map<ProtocolMessageEnum, AtomicLong> counters =
+  private Map<KEY, AtomicLong> counters =
+      new ConcurrentHashMap<>();
+
+  private Map<KEY, AtomicLong> elapsedTimes =
       new ConcurrentHashMap<>();
 
-  public static ProtocolMessageMetrics create(String name,
-      String description, ProtocolMessageEnum[] types) {
-    ProtocolMessageMetrics protocolMessageMetrics =
-        new ProtocolMessageMetrics(name, description,
-            types);
-    return protocolMessageMetrics;
+  public static <KEY> ProtocolMessageMetrics<KEY> create(String name,
+      String description, KEY[] types) {
+    return new ProtocolMessageMetrics<KEY>(name, description, types);
   }
 
   public ProtocolMessageMetrics(String name, String description,
-      ProtocolMessageEnum[] values) {
+      KEY[] values) {
     this.name = name;
     this.description = description;
-    for (ProtocolMessageEnum value : values) {
+    for (KEY value : values) {
       counters.put(value, new AtomicLong(0));
+      elapsedTimes.put(value, new AtomicLong(0));
     }
   }
 
-  public void increment(ProtocolMessageEnum key) {
+  public void increment(KEY key, long duration) {
     counters.get(key).incrementAndGet();
+    elapsedTimes.get(key).addAndGet(duration);
   }
 
   public void register() {
@@ -73,11 +75,19 @@ public class ProtocolMessageMetrics implements MetricsSource {
 
   @Override
   public void getMetrics(MetricsCollector collector, boolean all) {
-    MetricsRecordBuilder builder = collector.addRecord(name);
     counters.forEach((key, value) -> {
-      builder.addCounter(new MetricName(key.toString(), ""), value.longValue());
+      MetricsRecordBuilder builder =
+          collector.addRecord(name);
+      builder.add(
+          new MetricsTag(Interns.info("type", "Message type"), key.toString()));
+      builder.addCounter(new MetricName("counter", "Number of distinct calls"),
+          value.longValue());
+      builder.addCounter(
+          new MetricName("time", "Sum of the duration of the calls"),
+          elapsedTimes.get(key).longValue());
+      builder.endRecord();
+
     });
-    builder.endRecord();
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index 851fa35..fb07351 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 
+import com.google.protobuf.ProtocolMessageEnum;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.slf4j.Logger;
@@ -77,7 +78,8 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
    */
   public ScmBlockLocationProtocolServerSideTranslatorPB(
       ScmBlockLocationProtocol impl,
-      ProtocolMessageMetrics metrics) throws IOException {
+      ProtocolMessageMetrics<ProtocolMessageEnum> metrics)
+      throws IOException {
     this.impl = impl;
     dispatcher = new OzoneProtocolMessageDispatcher<>(
         "BlockLocationProtocol", metrics, LOG);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index cb3a5b3..6207343 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -68,6 +68,7 @@ import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
 import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 
+import com.google.protobuf.ProtocolMessageEnum;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.slf4j.Logger;
@@ -104,7 +105,8 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
    */
   public StorageContainerLocationProtocolServerSideTranslatorPB(
       StorageContainerLocationProtocol impl,
-      ProtocolMessageMetrics protocolMetrics) throws IOException {
+      ProtocolMessageMetrics<ProtocolMessageEnum> protocolMetrics)
+      throws IOException {
     this.impl = impl;
     this.dispatcher =
         new OzoneProtocolMessageDispatcher<>("ScmContainerLocation",
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index b88b54f..99f873f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTra
 
 import com.google.common.collect.Maps;
 import com.google.protobuf.BlockingService;
+import com.google.protobuf.ProtocolMessageEnum;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
@@ -86,7 +87,7 @@ public class SCMBlockProtocolServer implements
   private final OzoneConfiguration conf;
   private final RPC.Server blockRpcServer;
   private final InetSocketAddress blockRpcAddress;
-  private final ProtocolMessageMetrics
+  private final ProtocolMessageMetrics<ProtocolMessageEnum>
       protocolMessageMetrics;
 
   /**
@@ -104,7 +105,8 @@ public class SCMBlockProtocolServer implements
         ProtobufRpcEngine.class);
 
     protocolMessageMetrics =
-        ProtocolMessageMetrics.create("ScmBlockLocationProtocol",
+        ProtocolMessageMetrics.create(
+            "ScmBlockLocationProtocol",
             "SCM Block location protocol counters",
             ScmBlockLocationProtocolProtos.Type.values());
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index eb5e2c0..aa687c0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -21,37 +21,44 @@
  */
 package org.apache.hadoop.hdds.scm.server;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.protobuf.BlockingService;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.scm.safemode.SafeModeNotification;
-import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.ScmUtils;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.scm.safemode.SafeModeNotification;
+import org.apache.hadoop.hdds.scm.safemode.SafeModePrecheck;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -63,35 +70,21 @@ import org.apache.hadoop.ozone.audit.AuditLoggerType;
 import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.SCMAction;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos
-    .StorageContainerLocationProtocolService.newReflectiveBlockingService;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_CLIENT_ADDRESS_KEY;
 
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_HANDLER_COUNT_DEFAULT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys
-    .OZONE_SCM_HANDLER_COUNT_KEY;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.ProtocolMessageEnum;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService.newReflectiveBlockingService;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
-import static org.apache.hadoop.hdds.scm.server.StorageContainerManager
-    .startRpcServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The RPC server that listens to requests from clients.
@@ -107,7 +100,7 @@ public class SCMClientProtocolServer implements
   private final StorageContainerManager scm;
   private final OzoneConfiguration conf;
   private SafeModePrecheck safeModePrecheck;
-  private final ProtocolMessageMetrics protocolMetrics;
+  private final ProtocolMessageMetrics<ProtocolMessageEnum> protocolMetrics;
 
   public SCMClientProtocolServer(OzoneConfiguration conf,
       StorageContainerManager scm) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index fd38010..ad7f65a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -43,11 +43,12 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.ReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
+import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
@@ -68,25 +69,22 @@ import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.security.authorize.PolicyProvider;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.protobuf.BlockingService;
+import com.google.protobuf.ProtocolMessageEnum;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.closeContainerCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.closePipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.createPipelineCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteBlocksCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
-    .createPipelineCommand;
-import static org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
-    .closePipelineCommand;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
@@ -95,8 +93,6 @@ import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_REPORT;
 import static org.apache.hadoop.hdds.scm.server.StorageContainerManager.startRpcServer;
 import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
 import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
-
-import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,7 +117,7 @@ public class SCMDatanodeProtocolServer implements
   private final InetSocketAddress datanodeRpcAddress;
   private final SCMDatanodeHeartbeatDispatcher heartbeatDispatcher;
   private final EventPublisher eventPublisher;
-  private ProtocolMessageMetrics protocolMessageMetrics;
+  private ProtocolMessageMetrics<ProtocolMessageEnum> protocolMessageMetrics;
 
   public SCMDatanodeProtocolServer(final OzoneConfiguration conf,
                                    OzoneStorageContainerManager scm,
@@ -409,7 +405,8 @@ public class SCMDatanodeProtocolServer implements
    * Get the ProtocolMessageMetrics for this server.
    * @return ProtocolMessageMetrics
    */
-  protected ProtocolMessageMetrics getProtocolMessageMetrics() {
+  protected ProtocolMessageMetrics<ProtocolMessageEnum>
+        getProtocolMessageMetrics() {
     return ProtocolMessageMetrics
         .create("SCMDatanodeProtocol", "SCM Datanode protocol",
             StorageContainerDatanodeProtocolProtos.Type.values());
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 128aa3b..b43dd03 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -23,11 +23,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.utils.db.DBUpdatesWrapper;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
@@ -36,9 +36,9 @@ import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
@@ -61,25 +61,8 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListMultipartUploadsRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListMultipartUploadsResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneFileStatusProto;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupFileRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupFileResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AllocateBlockResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketArgs;
@@ -88,13 +71,22 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelD
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CommitKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateBucketRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateFileResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DBUpdatesResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteBucketRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteVolumeRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetDelegationTokenResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetFileStatusResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3SecretRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetS3SecretResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.InfoBucketRequest;
@@ -106,12 +98,16 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListBuc
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListBucketsResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListKeysRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListKeysResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListTrashResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverTrashRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverTrashResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListMultipartUploadsRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListMultipartUploadsResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListStatusResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListTrashRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListTrashResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ListVolumeResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupFileRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupFileResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupKeyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.LookupKeyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.MultipartCommitUploadPartRequest;
@@ -126,6 +122,9 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Multipa
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneFileStatusProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverTrashRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverTrashResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeyRequest;
@@ -139,6 +138,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.S3ListB
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetAclResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetBucketPropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetVolumePropertyRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
@@ -152,23 +152,21 @@ import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequ
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.hdds.utils.db.DBUpdatesWrapper;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static org.apache.hadoop.io.retry.RetryPolicy.RetryAction.FAILOVER_AND_RETRY;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.DIRECTORY_ALREADY_EXISTS;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *  The client side implementation of OzoneManagerProtocol.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 627be1f..1fccd7b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -168,6 +168,7 @@ import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
+import com.google.protobuf.ProtocolMessageEnum;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
@@ -255,7 +256,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private S3BucketManager s3BucketManager;
 
   private final OMMetrics metrics;
-  private final ProtocolMessageMetrics omClientProtocolMetrics;
+  private final ProtocolMessageMetrics<ProtocolMessageEnum>
+      omClientProtocolMetrics;
   private OzoneManagerHttpServer httpServer;
   private final OMStorage omStorage;
   private final ScmBlockLocationProtocol scmBlockClient;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 7ee2e6e..7c6df32 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -16,6 +16,11 @@
  */
 package org.apache.hadoop.ozone.protocolPB;
 
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 import org.apache.hadoop.ozone.OmUtils;
@@ -31,6 +36,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
+import com.google.protobuf.ProtocolMessageEnum;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -38,11 +44,6 @@ import org.apache.ratis.util.ExitUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicLong;
-
 /**
  * This class is the server-side translator that forwards requests received on
  * {@link OzoneManagerProtocolPB}
@@ -69,7 +70,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
   public OzoneManagerProtocolServerSideTranslatorPB(
       OzoneManager impl,
       OzoneManagerRatisServer ratisServer,
-      ProtocolMessageMetrics metrics,
+      ProtocolMessageMetrics<ProtocolMessageEnum> metrics,
       boolean enableRatis) {
     this.ozoneManager = impl;
     this.isRatisEnabled = enableRatis;
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDatanodeProtocolServer.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDatanodeProtocolServer.java
index 6d58931..a9cde11 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDatanodeProtocolServer.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDatanodeProtocolServer.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.ozone.protocol.ReconDatanodeProtocol;
 import org.apache.hadoop.ozone.protocolPB.ReconDatanodeProtocolPB;
 import org.apache.hadoop.security.authorize.PolicyProvider;
 
+import com.google.protobuf.ProtocolMessageEnum;
 import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY;
 
 /**
@@ -48,7 +49,8 @@ public class ReconDatanodeProtocolServer extends SCMDatanodeProtocolServer
   }
 
   @Override
-  public ProtocolMessageMetrics getProtocolMessageMetrics() {
+  public ProtocolMessageMetrics<ProtocolMessageEnum>
+      getProtocolMessageMetrics() {
     return ProtocolMessageMetrics
         .create("ReconDatanodeProtocol", "Recon Datanode protocol",
             StorageContainerDatanodeProtocolProtos.Type.values());


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org