You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/05/23 22:56:18 UTC

[26/50] [abbrv] hadoop git commit: HDDS-49. Standalone protocol should use grpc in place of netty. Contributed by Mukul Kumar Singh.

HDDS-49. Standalone protocol should use grpc in place of netty.
Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a914069
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a914069
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a914069

Branch: refs/heads/HDDS-4
Commit: 5a9140690aba295ba1226a3190b52f34347a8372
Parents: 3e5f7ea
Author: Anu Engineer <ae...@apache.org>
Authored: Tue May 22 16:51:43 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue May 22 19:56:15 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hdds/scm/XceiverClientGrpc.java      | 217 +++++++++++++++++++
 .../hadoop/hdds/scm/XceiverClientManager.java   |  21 +-
 .../hadoop/hdds/scm/XceiverClientMetrics.java   |   8 +-
 .../common/dev-support/findbugsExcludeFile.xml  |   3 +
 hadoop-hdds/common/pom.xml                      |  17 ++
 .../apache/hadoop/hdds/scm/ScmConfigKeys.java   |   4 +
 .../main/proto/DatanodeContainerProtocol.proto  |   7 +
 .../common/src/main/resources/ozone-default.xml |   9 +
 .../common/helpers/ContainerMetrics.java        |  14 +-
 .../transport/server/GrpcXceiverService.java    |  82 +++++++
 .../transport/server/XceiverServerGrpc.java     | 105 +++++++++
 .../container/ozoneimpl/OzoneContainer.java     |  11 +-
 .../hadoop/ozone/MiniOzoneClusterImpl.java      |  10 +-
 .../ozone/scm/TestXceiverClientManager.java     |  67 ++++--
 hadoop-project/pom.xml                          |   1 +
 15 files changed, 540 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
new file mode 100644
index 0000000..84790e8
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
+import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Client for the storageContainer protocol.
+ */
+public class XceiverClientGrpc extends XceiverClientSpi {
+  static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
+  private final Pipeline pipeline;
+  private final Configuration config;
+  private XceiverClientProtocolServiceStub asyncStub;
+  private XceiverClientMetrics metrics;
+  private ManagedChannel channel;
+  private final Semaphore semaphore;
+
+  /**
+   * Constructs a client that can communicate with the Container framework on
+   * data nodes.
+   *
+   * @param pipeline - Pipeline that defines the machines.
+   * @param config -- Ozone Config
+   */
+  public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
+    super();
+    Preconditions.checkNotNull(pipeline);
+    Preconditions.checkNotNull(config);
+    this.pipeline = pipeline;
+    this.config = config;
+    this.semaphore =
+        new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
+    this.metrics = XceiverClientManager.getXceiverClientMetrics();
+  }
+
+  @Override
+  public void connect() throws Exception {
+    DatanodeDetails leader = this.pipeline.getLeader();
+
+    // read port from the data node, on failure use default configured
+    // port.
+    int port = leader.getContainerPort();
+    if (port == 0) {
+      port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+          OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+    }
+    LOG.debug("Connecting to server Port : " + leader.getIpAddress());
+    channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
+        .usePlaintext(true)
+        .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+        .build();
+    asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
+  }
+
+  /**
+   * Returns if the xceiver client connects to a server.
+   *
+   * @return True if the connection is alive, false otherwise.
+   */
+  @VisibleForTesting
+  public boolean isConnected() {
+    return !channel.isTerminated() && !channel.isShutdown();
+  }
+
+  @Override
+  public void close() {
+    channel.shutdownNow();
+    try {
+      channel.awaitTermination(60, TimeUnit.MINUTES);
+    } catch (Exception e) {
+      LOG.error("Unexpected exception while waiting for channel termination",
+          e);
+    }
+  }
+
+  @Override
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  @Override
+  public ContainerCommandResponseProto sendCommand(
+      ContainerCommandRequestProto request) throws IOException {
+    try {
+      return sendCommandAsync(request).get();
+    } catch (ExecutionException | InterruptedException e) {
+      /**
+       * In case the grpc channel handler throws an exception,
+       * the exception thrown will be wrapped within {@link ExecutionException}.
+       * Unwarpping here so that original exception gets passed
+       * to to the client.
+       */
+      if (e instanceof ExecutionException) {
+        Throwable cause = e.getCause();
+        if (cause instanceof IOException) {
+          throw (IOException) cause;
+        }
+      }
+      throw new IOException(
+          "Unexpected exception during execution:" + e.getMessage());
+    }
+  }
+
+  /**
+   * Sends a given command to server gets a waitable future back.
+   *
+   * @param request Request
+   * @return Response to the command
+   * @throws IOException
+   */
+  @Override
+  public CompletableFuture<ContainerCommandResponseProto>
+      sendCommandAsync(ContainerCommandRequestProto request)
+      throws IOException, ExecutionException, InterruptedException {
+    final CompletableFuture<ContainerCommandResponseProto> replyFuture =
+        new CompletableFuture<>();
+    semaphore.acquire();
+    long requestTime = Time.monotonicNowNanos();
+    metrics.incrPendingContainerOpsMetrics(request.getCmdType());
+    // create a new grpc stream for each non-async call.
+    final StreamObserver<ContainerCommandRequestProto> requestObserver =
+        asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() {
+          @Override
+          public void onNext(ContainerCommandResponseProto value) {
+            replyFuture.complete(value);
+            metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+            metrics.addContainerOpsLatency(request.getCmdType(),
+                Time.monotonicNowNanos() - requestTime);
+            semaphore.release();
+          }
+          @Override
+          public void onError(Throwable t) {
+            replyFuture.completeExceptionally(t);
+            metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+            metrics.addContainerOpsLatency(request.getCmdType(),
+                Time.monotonicNowNanos() - requestTime);
+            semaphore.release();
+          }
+
+          @Override
+          public void onCompleted() {
+            if (!replyFuture.isDone()) {
+              replyFuture.completeExceptionally(
+                  new IOException("Stream completed but no reply for request "
+                      + request));
+            }
+          }
+        });
+    requestObserver.onNext(request);
+    requestObserver.onCompleted();
+    return replyFuture;
+  }
+
+  /**
+   * Create a pipeline.
+   *
+   * @param pipelineID - Name of the pipeline.
+   * @param datanodes - Datanodes
+   */
+  @Override
+  public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes)
+      throws IOException {
+    // For stand alone pipeline, there is no notion called setup pipeline.
+    return;
+  }
+
+  /**
+   * Returns pipeline Type.
+   *
+   * @return - Stand Alone as the type.
+   */
+  @Override
+  public HddsProtos.ReplicationType getPipelineType() {
+    return HddsProtos.ReplicationType.STAND_ALONE;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index dcaa576..8919797 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -41,8 +41,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys
     .SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
-    .ReplicationType.RATIS;
 
 /**
  * XceiverClientManager is responsible for the lifecycle of XceiverClient
@@ -62,6 +60,7 @@ public class XceiverClientManager implements Closeable {
   private final Configuration conf;
   private final Cache<Long, XceiverClientSpi> clientCache;
   private final boolean useRatis;
+  private final boolean useGrpc;
 
   private static XceiverClientMetrics metrics;
   /**
@@ -79,6 +78,8 @@ public class XceiverClientManager implements Closeable {
     this.useRatis = conf.getBoolean(
         ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
         ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+    this.useGrpc = conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+        ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
     this.conf = conf;
     this.clientCache = CacheBuilder.newBuilder()
         .expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
@@ -146,9 +147,19 @@ public class XceiverClientManager implements Closeable {
           new Callable<XceiverClientSpi>() {
           @Override
           public XceiverClientSpi call() throws Exception {
-            XceiverClientSpi client = pipeline.getType() == RATIS ?
-                    XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
-                    : new XceiverClient(pipeline, conf);
+            XceiverClientSpi client = null;
+            switch (pipeline.getType()) {
+            case RATIS:
+              client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
+              break;
+            case STAND_ALONE:
+              client = useGrpc ? new XceiverClientGrpc(pipeline, conf) :
+                  new XceiverClient(pipeline, conf);
+              break;
+            case CHAINED:
+            default:
+              throw new IOException ("not implemented" + pipeline.getType());
+            }
             client.connect();
             return client;
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
index fbc348c..a430400 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
@@ -49,13 +49,13 @@ public class XceiverClientMetrics {
     this.containerOpsLatency = new MutableRate[numEnumEntries];
     for (int i = 0; i < numEnumEntries; i++) {
       pendingOpsArray[i] = registry.newCounter(
-          "numPending" + ContainerProtos.Type.valueOf(i + 1),
-          "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
+          "numPending" + ContainerProtos.Type.forNumber(i + 1),
+          "number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops",
           (long) 0);
 
       containerOpsLatency[i] = registry.newRate(
-          ContainerProtos.Type.valueOf(i + 1) + "Latency",
-          "latency of " + ContainerProtos.Type.valueOf(i + 1)
+          ContainerProtos.Type.forNumber(i + 1) + "Latency",
+          "latency of " + ContainerProtos.Type.forNumber(i + 1)
           + " ops");
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
index 3571a89..daf6fec 100644
--- a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
@@ -18,4 +18,7 @@
   <Match>
     <Package name="org.apache.hadoop.hdds.protocol.proto"/>
   </Match>
+  <Match>
+    <Package name="org.apache.hadoop.hdds.protocol.datanode.proto"/>
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index 6310df1..a8a634c 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -61,6 +61,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>ratis-grpc</artifactId>
       <groupId>org.apache.ratis</groupId>
     </dependency>
+    <dependency>
+      <groupId>com.google.errorprone</groupId>
+      <artifactId>error_prone_annotations</artifactId>
+      <version>2.2.0</version>
+      <optional>true</optional>
+    </dependency>
 
     <dependency>
       <groupId>org.rocksdb</groupId>
@@ -108,7 +114,15 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
               <goals>
                 <goal>compile</goal>
                 <goal>test-compile</goal>
+                <goal>compile-custom</goal>
+                <goal>test-compile-custom</goal>
               </goals>
+              <configuration>
+                <pluginId>grpc-java</pluginId>
+                <pluginArtifact>
+                  io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+                </pluginArtifact>
+              </configuration>
           </execution>
         </executions>
       </plugin>
@@ -122,6 +136,9 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
                 <replace token="com.google.protobuf" value="org.apache.ratis.shaded.com.google.protobuf"
                   dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
                 </replace>
+                <replace token="io.grpc" value="org.apache.ratis.shaded.io.grpc"
+                  dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
+                </replace>
               </tasks>
             </configuration>
             <goals>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 29ccf30..85407e6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -49,6 +49,10 @@ public final class ScmConfigKeys {
       = "dfs.container.ratis.enabled";
   public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
       = false;
+  public static final String DFS_CONTAINER_GRPC_ENABLED_KEY
+      = "dfs.container.grpc.enabled";
+  public static final boolean DFS_CONTAINER_GRPC_ENABLED_DEFAULT
+      = false;
   public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
       = "dfs.container.ratis.rpc.type";
   public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 95b7cbb..1138297 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -24,6 +24,7 @@
 
 // This file contains protocol buffers that are used to transfer data
 // to and from the datanode.
+syntax = "proto2";
 option java_package = "org.apache.hadoop.hdds.protocol.datanode.proto";
 option java_outer_classname = "ContainerProtos";
 option java_generate_equals_and_hash = true;
@@ -418,3 +419,9 @@ message CopyContainerResponseProto {
   repeated bytes data = 5;
   optional int64 checksum = 6;
 }
+
+service XceiverClientProtocolService {
+  // A client-to-datanode RPC to send container commands
+  rpc send(stream ContainerCommandRequestProto) returns
+    (stream ContainerCommandResponseProto) {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index e0aca67..7a91610 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -72,6 +72,15 @@
     </description>
   </property>
   <property>
+    <name>dfs.container.grpc.enabled</name>
+    <value>false</value>
+    <tag>OZONE, MANAGEMENT, PIPELINE, RATIS</tag>
+    <description>Ozone supports different kinds of replication pipelines
+      protocols. grpc is one of the replication pipeline protocol supported by
+      ozone.
+    </description>
+  </property>
+  <property>
     <name>dfs.container.ratis.ipc</name>
     <value>9858</value>
     <tag>OZONE, CONTAINER, PIPELINE, RATIS, MANAGEMENT</tag>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
index 4300b2d..714db59 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -63,20 +63,20 @@ public class ContainerMetrics {
     this.registry = new MetricsRegistry("StorageContainerMetrics");
     for (int i = 0; i < numEnumEntries; i++) {
       numOpsArray[i] = registry.newCounter(
-          "num" + ContainerProtos.Type.valueOf(i + 1),
-          "number of " + ContainerProtos.Type.valueOf(i + 1) + " ops",
+          "num" + ContainerProtos.Type.forNumber(i + 1),
+          "number of " + ContainerProtos.Type.forNumber(i + 1) + " ops",
           (long) 0);
       opsBytesArray[i] = registry.newCounter(
-          "bytes" + ContainerProtos.Type.valueOf(i + 1),
-          "bytes used by " + ContainerProtos.Type.valueOf(i + 1) + "op",
+          "bytes" + ContainerProtos.Type.forNumber(i + 1),
+          "bytes used by " + ContainerProtos.Type.forNumber(i + 1) + "op",
           (long) 0);
       opsLatency[i] = registry.newRate(
-          "latency" + ContainerProtos.Type.valueOf(i + 1),
-          ContainerProtos.Type.valueOf(i + 1) + " op");
+          "latency" + ContainerProtos.Type.forNumber(i + 1),
+          ContainerProtos.Type.forNumber(i + 1) + " op");
 
       for (int j = 0; j < len; j++) {
         int interval = intervals[j];
-        String quantileName = ContainerProtos.Type.valueOf(i + 1) + "Nanos"
+        String quantileName = ContainerProtos.Type.forNumber(i + 1) + "Nanos"
             + interval + "s";
         opsLatQuantiles[i][j] = registry.newQuantiles(quantileName,
             "latency of Container ops", "ops", "latency", interval);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
new file mode 100644
index 0000000..df6220c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+    .XceiverClientProtocolServiceGrpc;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Grpc Service for handling Container Commands on datanode.
+ */
+public class GrpcXceiverService extends
+    XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceImplBase {
+  public static final Logger
+      LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
+
+  private final ContainerDispatcher dispatcher;
+
+  public GrpcXceiverService(ContainerDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public StreamObserver<ContainerCommandRequestProto> send(
+      StreamObserver<ContainerCommandResponseProto> responseObserver) {
+    return new StreamObserver<ContainerCommandRequestProto>() {
+      private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+      @Override
+      public void onNext(ContainerCommandRequestProto request) {
+        try {
+          ContainerCommandResponseProto resp = dispatcher.dispatch(request);
+          responseObserver.onNext(resp);
+        } catch (Throwable e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("{} got exception when processing"
+                    + " ContainerCommandRequestProto {}: {}", request, e);
+          }
+          responseObserver.onError(e);
+        }
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        // for now we just log a msg
+        LOG.info("{}: ContainerCommand send on error. Exception: {}", t);
+      }
+
+      @Override
+      public void onCompleted() {
+        if (isClosed.compareAndSet(false, true)) {
+          LOG.info("{}: ContainerCommand send completed");
+          responseObserver.onCompleted();
+        }
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
new file mode 100644
index 0000000..30a2f87
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.shaded.io.grpc.Server;
+import org.apache.ratis.shaded.io.grpc.ServerBuilder;
+import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+
+/**
+ * Creates a Grpc server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServerGrpc implements XceiverServerSpi {
+  private static final Logger
+      LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
+  private int port;
+  private Server server;
+
+  /**
+   * Constructs a Grpc server class.
+   *
+   * @param conf - Configuration
+   */
+  public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
+                       ContainerDispatcher dispatcher) {
+    Preconditions.checkNotNull(conf);
+
+    this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+    // Get an available port on current node and
+    // use that as the container port
+    if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
+      try (ServerSocket socket = new ServerSocket()) {
+        socket.setReuseAddress(true);
+        SocketAddress address = new InetSocketAddress(0);
+        socket.bind(address);
+        this.port = socket.getLocalPort();
+        LOG.info("Found a free port for the server : {}", this.port);
+      } catch (IOException e) {
+        LOG.error("Unable find a random free port for the server, "
+            + "fallback to use default port {}", this.port, e);
+      }
+    }
+    datanodeDetails.setContainerPort(port);
+    server = ((NettyServerBuilder) ServerBuilder.forPort(port))
+        .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+        .addService(new GrpcXceiverService(dispatcher))
+        .build();
+  }
+
+  @Override
+  public int getIPCPort() {
+    return this.port;
+  }
+
+  /**
+   * Returns the Replication type supported by this end-point.
+   *
+   * @return enum -- {Stand_Alone, Ratis, Grpc, Chained}
+   */
+  @Override
+  public HddsProtos.ReplicationType getServerType() {
+    return HddsProtos.ReplicationType.STAND_ALONE;
+  }
+
+  @Override
+  public void start() throws IOException {
+    server.start();
+  }
+
+  @Override
+  public void stop() {
+    server.shutdown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 1fc79d7..b497cdc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -40,6 +41,8 @@ import org.apache.hadoop.ozone.container.common.statemachine.background
     .BlockDeletingService;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
 import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerGrpc;
+import org.apache.hadoop.ozone.container.common.transport.server
     .XceiverServerSpi;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis
     .XceiverServerRatis;
@@ -121,8 +124,14 @@ public class OzoneContainer {
 
     this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
 
+    boolean useGrpc = this.ozoneConfig.getBoolean(
+        ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+        ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
     server = new XceiverServerSpi[]{
-        new XceiverServer(datanodeDetails, this.ozoneConfig, this.dispatcher),
+        useGrpc ? new XceiverServerGrpc(datanodeDetails,
+            this.ozoneConfig, this.dispatcher) :
+            new XceiverServer(datanodeDetails,
+                this.ozoneConfig, this.dispatcher),
       XceiverServerRatis
           .newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher)
     };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 08d7176..9936815 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -220,13 +220,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
     datanodeService.stop();
     datanodeService.join();
     // ensure same ports are used across restarts.
-    Configuration conf = datanodeService.getConf();
+    Configuration config = datanodeService.getConf();
     int currentPort = datanodeService.getDatanodeDetails().getContainerPort();
-    conf.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
-    conf.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
+    config.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
+    config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
     int ratisPort = datanodeService.getDatanodeDetails().getRatisPort();
-    conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
-    conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
+    config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
+    config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
     datanodeService.start(null);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
index 07ad6ef..77e4e1b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm;
 
 import com.google.common.cache.Cache;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -30,13 +31,17 @@ import org.apache.hadoop.hdds.scm.protocolPB
     .StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.junit.Assert;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 
 import static org.apache.hadoop.hdds.scm
     .ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
@@ -44,19 +49,32 @@ import static org.apache.hadoop.hdds.scm
 /**
  * Test for XceiverClientManager caching and eviction.
  */
+@RunWith(Parameterized.class)
 public class TestXceiverClientManager {
   private static OzoneConfiguration config;
   private static MiniOzoneCluster cluster;
   private static StorageContainerLocationProtocolClientSideTranslatorPB
       storageContainerLocationClient;
   private static String containerOwner = "OZONE";
+  private static boolean shouldUseGrpc;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> withGrpc() {
+    return Arrays.asList(new Object[][] {{false}, {true}});
+  }
+
+  public TestXceiverClientManager(boolean useGrpc) {
+    shouldUseGrpc = useGrpc;
+  }
 
   @Rule
   public ExpectedException exception = ExpectedException.none();
 
-  @BeforeClass
-  public static void init() throws Exception {
+  @Before
+  public void init() throws Exception {
     config = new OzoneConfiguration();
+    config.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+        shouldUseGrpc);
     cluster = MiniOzoneCluster.newBuilder(config)
         .setNumDatanodes(3)
         .build();
@@ -65,8 +83,8 @@ public class TestXceiverClientManager {
         .getStorageContainerLocationClient();
   }
 
-  @AfterClass
-  public static void shutdown() {
+  @After
+  public void shutdown() {
     if (cluster != null) {
       cluster.shutdown();
     }
@@ -76,6 +94,8 @@ public class TestXceiverClientManager {
   @Test
   public void testCaching() throws IOException {
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+        shouldUseGrpc);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
 
     ContainerInfo container1 = storageContainerLocationClient
@@ -106,6 +126,8 @@ public class TestXceiverClientManager {
   public void testFreeByReference() throws IOException {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
+    conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+        shouldUseGrpc);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
     Cache<Long, XceiverClientSpi> cache =
         clientManager.getClientCache();
@@ -140,10 +162,18 @@ public class TestXceiverClientManager {
     // After releasing the client, this connection should be closed
     // and any container operations should fail
     clientManager.releaseClient(client1);
-    exception.expect(IOException.class);
-    exception.expectMessage("This channel is not connected.");
-    ContainerProtocolCalls.createContainer(client1,
-        container1.getContainerID(), traceID1);
+
+    String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
+        "This channel is not connected.";
+    try {
+      ContainerProtocolCalls.createContainer(client1,
+          container1.getContainerID(), traceID1);
+      Assert.fail("Create container should throw exception on closed"
+          + "client");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getClass(), IOException.class);
+      Assert.assertTrue(e.getMessage().contains(expectedMessage));
+    }
     clientManager.releaseClient(client2);
   }
 
@@ -151,6 +181,8 @@ public class TestXceiverClientManager {
   public void testFreeByEviction() throws IOException {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
+    conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+        shouldUseGrpc);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
     Cache<Long, XceiverClientSpi> cache =
         clientManager.getClientCache();
@@ -181,10 +213,17 @@ public class TestXceiverClientManager {
 
     // Any container operation should now fail
     String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
-    exception.expect(IOException.class);
-    exception.expectMessage("This channel is not connected.");
-    ContainerProtocolCalls.createContainer(client1,
-        container1.getContainerID(), traceID2);
+    String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
+        "This channel is not connected.";
+    try {
+      ContainerProtocolCalls.createContainer(client1,
+          container1.getContainerID(), traceID2);
+      Assert.fail("Create container should throw exception on closed"
+          + "client");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getClass(), IOException.class);
+      Assert.assertTrue(e.getMessage().contains(expectedMessage));
+    }
     clientManager.releaseClient(client2);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index a916108..73c3f5b 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -107,6 +107,7 @@
     <!-- Maven protoc compiler -->
     <protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
     <protobuf-compile.version>3.5.0</protobuf-compile.version>
+    <grpc.version>1.10.0</grpc.version>
     <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
 
     <!-- define the Java language version used by the compiler -->


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org