You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2019/09/26 00:30:33 UTC

[hadoop] branch trunk updated: HDDS-2067. Create generic service facade with tracing/metrics/logging support

This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f647185  HDDS-2067. Create generic service facade with tracing/metrics/logging support
f647185 is described below

commit f647185905f6047fc9734b8aa37d6ef59b6082c2
Author: Márton Elek <el...@apache.org>
AuthorDate: Mon Sep 23 13:08:04 2019 +0200

    HDDS-2067. Create generic service facade with tracing/metrics/logging support
    
    Signed-off-by: Anu Engineer <ae...@apache.org>
    Co-Authored-By: Doroszlai, Attila <64...@users.noreply.github.com>
---
 .../function/FunctionWithServiceException.java     | 36 ++++++++
 .../apache/hadoop/hdds/function/package-info.java  | 22 +++++
 .../server/OzoneProtocolMessageDispatcher.java     | 88 ++++++++++++++++++++
 ...lockLocationProtocolServerSideTranslatorPB.java | 56 ++++---------
 ...inerLocationProtocolServerSideTranslatorPB.java | 96 ++++++++--------------
 .../hadoop/hdds/scm/protocol/package-info.java     | 21 +++++
 .../hdds/scm/server/SCMBlockProtocolServer.java    |  2 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  3 +-
 .../scm/server/TestSCMBlockProtocolServer.java     |  3 +-
 .../scm/ScmProtocolBlockLocationInsight.java       |  2 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java | 41 ++-------
 11 files changed, 231 insertions(+), 139 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/FunctionWithServiceException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/FunctionWithServiceException.java
new file mode 100644
index 0000000..b9d7bce
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/FunctionWithServiceException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.function;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * Functional interface like java.util.function.Function but with
+ * checked exception.
+ */
+@FunctionalInterface
+public interface FunctionWithServiceException<T, R> {
+
+  /**
+   * Applies this function to the given argument.
+   *
+   * @param t the function argument
+   * @return the function result
+   */
+  R apply(T t) throws ServiceException;
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/package-info.java
new file mode 100644
index 0000000..915fe35
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Functional interfaces for ozone, similar to java.util.function.
+ */
+package org.apache.hadoop.hdds.function;
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
new file mode 100644
index 0000000..d67a759
--- /dev/null
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.server;
+
+import org.apache.hadoop.hdds.function.FunctionWithServiceException;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
+
+import com.google.protobuf.ProtocolMessageEnum;
+import com.google.protobuf.ServiceException;
+import io.opentracing.Scope;
+import org.slf4j.Logger;
+
+/**
+ * Dispatch message after tracing and message logging for insight.
+ * <p>
+ * This is a generic utility to dispatch message in ServerSide translators.
+ * <p>
+ * It logs the message type/content on DEBUG/TRACING log for insight and create
+ * a new span based on the tracing information.
+ */
+public class OzoneProtocolMessageDispatcher<REQUEST, RESPONSE> {
+
+  private String serviceName;
+
+  private final ProtocolMessageMetrics protocolMessageMetrics;
+
+  private Logger logger;
+
+  public OzoneProtocolMessageDispatcher(String serviceName,
+      ProtocolMessageMetrics protocolMessageMetrics, Logger logger) {
+    this.serviceName = serviceName;
+    this.protocolMessageMetrics = protocolMessageMetrics;
+    this.logger = logger;
+  }
+
+  public RESPONSE processRequest(
+      REQUEST request,
+      FunctionWithServiceException<REQUEST, RESPONSE> methodCall,
+      ProtocolMessageEnum type,
+      String traceId) throws ServiceException {
+    Scope scope = TracingUtil
+        .importAndCreateScope(type.toString(), traceId);
+    try {
+      if (logger.isTraceEnabled()) {
+        logger.trace(
+            "{} {} request is received: <json>{}</json>",
+            serviceName,
+            type.toString(),
+            request.toString().replaceAll("\n", "\\\\n"));
+      } else if (logger.isDebugEnabled()) {
+        logger.debug("{} {} request is received",
+            serviceName, type.toString());
+      }
+      protocolMessageMetrics.increment(type);
+
+      RESPONSE response = methodCall.apply(request);
+
+      if (logger.isTraceEnabled()) {
+        logger.trace(
+            "{} {} request is processed. Response: "
+                + "<json>{}</json>",
+            serviceName,
+            type.toString(),
+            response.toString().replaceAll("\n", "\\\\n"));
+      }
+      return response;
+
+    } finally {
+      scope.close();
+    }
+  }
+}
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
similarity index 85%
rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
index bad24cf..b6ce067 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.ozone.protocolPB;
+package org.apache.hadoop.hdds.scm.protocol;
 
 import java.io.IOException;
 import java.util.List;
@@ -40,17 +40,15 @@ import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-import io.opentracing.Scope;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,8 +66,9 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
   private static final Logger LOG = LoggerFactory
       .getLogger(ScmBlockLocationProtocolServerSideTranslatorPB.class);
 
-  private final ProtocolMessageMetrics
-      protocolMessageMetrics;
+  private final OzoneProtocolMessageDispatcher<SCMBlockLocationRequest,
+      SCMBlockLocationResponse>
+      dispatcher;
 
   /**
    * Creates a new ScmBlockLocationProtocolServerSideTranslatorPB.
@@ -80,7 +79,9 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
       ScmBlockLocationProtocol impl,
       ProtocolMessageMetrics metrics) throws IOException {
     this.impl = impl;
-    this.protocolMessageMetrics = metrics;
+    dispatcher = new OzoneProtocolMessageDispatcher<>(
+        "BlockLocationProtocol", metrics, LOG);
+
   }
 
   private SCMBlockLocationResponse.Builder createSCMBlockResponse(
@@ -94,43 +95,18 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
   @Override
   public SCMBlockLocationResponse send(RpcController controller,
       SCMBlockLocationRequest request) throws ServiceException {
-    String traceId = request.getTraceID();
-
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("BlockLocationProtocol {} request is received: <json>{}</json>",
-          request.getCmdType().toString(),
-          request.toString().replaceAll("\n", "\\\\n"));
-
-    } else if (LOG.isDebugEnabled()) {
-      LOG.debug("BlockLocationProtocol {} request is received",
-          request.getCmdType().toString());
-    }
-
-    protocolMessageMetrics.increment(request.getCmdType());
-
-    try (Scope scope = TracingUtil
-        .importAndCreateScope(
-            "ScmBlockLocationProtocol." + request.getCmdType(),
-            request.getTraceID())) {
-      SCMBlockLocationResponse response =
-          processMessage(request, traceId);
-
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-            "BlockLocationProtocol {} request is processed. Response: "
-                + "<json>{}</json>",
-            request.getCmdType().toString(),
-            response.toString().replaceAll("\n", "\\\\n"));
-      }
-      return response;
-    }
+    return dispatcher.processRequest(
+        request,
+        this::processMessage,
+        request.getCmdType(),
+        request.getTraceID());
   }
 
   private SCMBlockLocationResponse processMessage(
-      SCMBlockLocationRequest request, String traceId) throws ServiceException {
+      SCMBlockLocationRequest request) throws ServiceException {
     SCMBlockLocationResponse.Builder response = createSCMBlockResponse(
         request.getCmdType(),
-        traceId);
+        request.getTraceID());
     response.setSuccess(true);
     response.setStatus(Status.OK);
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
similarity index 81%
rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 99c9e8d..9d53dbf 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -16,82 +16,56 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.ozone.protocolPB;
+package org.apache.hadoop.hdds.scm.protocol;
 
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import io.opentracing.Scope;
+import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
 import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ActivatePipelineResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.DeactivatePipelineResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ClosePipelineResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.GetContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.GetContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.PipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.PipelineResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 
-import java.io.IOException;
-import java.util.List;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import io.opentracing.Scope;
 
 /**
  * This class is the server-side translator that forwards requests received on
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java
new file mode 100644
index 0000000..411f22e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.protocol;
+/**
+ * RPC/protobuf specific translator classes for SCM protocol.
+ */
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
index 500a8cd..5500891 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.ozone.audit.SCMAction;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
 import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
-import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
 
 import com.google.common.collect.Maps;
 import com.google.protobuf.BlockingService;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 7d9cb3e..e0136e8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -60,8 +60,7 @@ import org.apache.hadoop.ozone.audit.AuditLoggerType;
 import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.SCMAction;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
index e08fdc1..d2044f5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/TestSCMBlockProtocolServer.java
@@ -25,8 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
-import org.apache.hadoop.ozone.protocolPB
-    .ScmBlockLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.test.GenericTestUtils;
 
 import org.junit.After;
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
index 73f1512..5ca0945 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.ozone.insight.BaseInsightPoint;
 import org.apache.hadoop.ozone.insight.Component.Type;
 import org.apache.hadoop.ozone.insight.LoggerSource;
 import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
-import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
 
 /**
  * Insight metric to check the SCM block location protocol behaviour.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 6f8e9df..d4c029b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -17,7 +17,8 @@
 package org.apache.hadoop.ozone.protocolPB;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.tracing.TracingUtil;
+
+import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
@@ -33,7 +34,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRespo
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-import io.opentracing.Scope;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.ExitUtils;
 import org.slf4j.Logger;
@@ -58,8 +58,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
   private final boolean isRatisEnabled;
   private final OzoneManager ozoneManager;
   private final OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer;
-  private final ProtocolMessageMetrics protocolMessageMetrics;
   private final AtomicLong transactionIndex = new AtomicLong(0L);
+  private final OzoneProtocolMessageDispatcher<OMRequest, OMResponse>
+      dispatcher;
 
   /**
    * Constructs an instance of the server handler.
@@ -75,7 +76,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
     handler = new OzoneManagerRequestHandler(impl);
     this.omRatisServer = ratisServer;
     this.isRatisEnabled = enableRatis;
-    this.protocolMessageMetrics = metrics;
     this.ozoneManagerDoubleBuffer =
         new OzoneManagerDoubleBuffer(ozoneManager.getMetadataManager(), (i) -> {
           // Do nothing.
@@ -83,6 +83,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
           // As we wait until the double buffer flushes DB to disk.
         }, isRatisEnabled);
 
+    dispatcher = new OzoneProtocolMessageDispatcher<>("OzoneProtocol",
+        metrics, LOG);
+
   }
 
   /**
@@ -93,35 +96,9 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
   @Override
   public OMResponse submitRequest(RpcController controller,
       OMRequest request) throws ServiceException {
-    Scope scope = TracingUtil
-        .importAndCreateScope(request.getCmdType().name(),
-            request.getTraceID());
-    try {
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-            "OzoneManagerProtocol {} request is received: <json>{}</json>",
-            request.getCmdType().toString(),
-            request.toString().replaceAll("\n", "\\\\n"));
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug("OzoneManagerProtocol {} request is received",
-            request.getCmdType().toString());
-      }
-      protocolMessageMetrics.increment(request.getCmdType());
 
-      OMResponse omResponse = processRequest(request);
-
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(
-            "OzoneManagerProtocol {} request is processed. Response: "
-                + "<json>{}</json>",
-            request.getCmdType().toString(),
-            omResponse.toString().replaceAll("\n", "\\\\n"));
-      }
-      return omResponse;
-
-    } finally {
-      scope.close();
-    }
+    return dispatcher.processRequest(request, this::processRequest,
+        request.getCmdType(), request.getTraceID());
   }
 
   private OMResponse processRequest(OMRequest request) throws


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org