You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/05/23 02:56:27 UTC
hadoop git commit: HDDS-49. Standalone protocol should use grpc in
place of netty. Contributed by Mukul Kumar Singh.
Repository: hadoop
Updated Branches:
refs/heads/trunk 3e5f7ea98 -> 5a9140690
HDDS-49. Standalone protocol should use grpc in place of netty.
Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a914069
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a914069
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a914069
Branch: refs/heads/trunk
Commit: 5a9140690aba295ba1226a3190b52f34347a8372
Parents: 3e5f7ea
Author: Anu Engineer <ae...@apache.org>
Authored: Tue May 22 16:51:43 2018 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue May 22 19:56:15 2018 -0700
----------------------------------------------------------------------
.../hadoop/hdds/scm/XceiverClientGrpc.java | 217 +++++++++++++++++++
.../hadoop/hdds/scm/XceiverClientManager.java | 21 +-
.../hadoop/hdds/scm/XceiverClientMetrics.java | 8 +-
.../common/dev-support/findbugsExcludeFile.xml | 3 +
hadoop-hdds/common/pom.xml | 17 ++
.../apache/hadoop/hdds/scm/ScmConfigKeys.java | 4 +
.../main/proto/DatanodeContainerProtocol.proto | 7 +
.../common/src/main/resources/ozone-default.xml | 9 +
.../common/helpers/ContainerMetrics.java | 14 +-
.../transport/server/GrpcXceiverService.java | 82 +++++++
.../transport/server/XceiverServerGrpc.java | 105 +++++++++
.../container/ozoneimpl/OzoneContainer.java | 11 +-
.../hadoop/ozone/MiniOzoneClusterImpl.java | 10 +-
.../ozone/scm/TestXceiverClientManager.java | 67 ++++--
hadoop-project/pom.xml | 1 +
15 files changed, 540 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
new file mode 100644
index 0000000..84790e8
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc;
+import org.apache.hadoop.hdds.protocol.datanode.proto.XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceStub;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.shaded.io.grpc.ManagedChannel;
+import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Client for the storageContainer protocol.
+ */
+public class XceiverClientGrpc extends XceiverClientSpi {
+ static final Logger LOG = LoggerFactory.getLogger(XceiverClientGrpc.class);
+ private final Pipeline pipeline;
+ private final Configuration config;
+ private XceiverClientProtocolServiceStub asyncStub;
+ private XceiverClientMetrics metrics;
+ private ManagedChannel channel;
+ private final Semaphore semaphore;
+
+ /**
+ * Constructs a client that can communicate with the Container framework on
+ * data nodes.
+ *
+ * @param pipeline - Pipeline that defines the machines.
+ * @param config -- Ozone Config
+ */
+ public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
+ super();
+ Preconditions.checkNotNull(pipeline);
+ Preconditions.checkNotNull(config);
+ this.pipeline = pipeline;
+ this.config = config;
+ this.semaphore =
+ new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
+ this.metrics = XceiverClientManager.getXceiverClientMetrics();
+ }
+
+ @Override
+ public void connect() throws Exception {
+ DatanodeDetails leader = this.pipeline.getLeader();
+
+ // read port from the data node, on failure use default configured
+ // port.
+ int port = leader.getContainerPort();
+ if (port == 0) {
+ port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+ }
+ LOG.debug("Connecting to server Port : " + leader.getIpAddress());
+ channel = NettyChannelBuilder.forAddress(leader.getIpAddress(), port)
+ .usePlaintext(true)
+ .maxInboundMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+ .build();
+ asyncStub = XceiverClientProtocolServiceGrpc.newStub(channel);
+ }
+
+ /**
+ * Returns if the xceiver client connects to a server.
+ *
+ * @return True if the connection is alive, false otherwise.
+ */
+ @VisibleForTesting
+ public boolean isConnected() {
+ return !channel.isTerminated() && !channel.isShutdown();
+ }
+
+ @Override
+ public void close() {
+ channel.shutdownNow();
+ try {
+ channel.awaitTermination(60, TimeUnit.MINUTES);
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while waiting for channel termination",
+ e);
+ }
+ }
+
+ @Override
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ @Override
+ public ContainerCommandResponseProto sendCommand(
+ ContainerCommandRequestProto request) throws IOException {
+ try {
+ return sendCommandAsync(request).get();
+ } catch (ExecutionException | InterruptedException e) {
+ /**
+ * In case the grpc channel handler throws an exception,
+ * the exception thrown will be wrapped within {@link ExecutionException}.
+ * Unwarpping here so that original exception gets passed
+ * to to the client.
+ */
+ if (e instanceof ExecutionException) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ throw (IOException) cause;
+ }
+ }
+ throw new IOException(
+ "Unexpected exception during execution:" + e.getMessage());
+ }
+ }
+
+ /**
+ * Sends a given command to server gets a waitable future back.
+ *
+ * @param request Request
+ * @return Response to the command
+ * @throws IOException
+ */
+ @Override
+ public CompletableFuture<ContainerCommandResponseProto>
+ sendCommandAsync(ContainerCommandRequestProto request)
+ throws IOException, ExecutionException, InterruptedException {
+ final CompletableFuture<ContainerCommandResponseProto> replyFuture =
+ new CompletableFuture<>();
+ semaphore.acquire();
+ long requestTime = Time.monotonicNowNanos();
+ metrics.incrPendingContainerOpsMetrics(request.getCmdType());
+ // create a new grpc stream for each non-async call.
+ final StreamObserver<ContainerCommandRequestProto> requestObserver =
+ asyncStub.send(new StreamObserver<ContainerCommandResponseProto>() {
+ @Override
+ public void onNext(ContainerCommandResponseProto value) {
+ replyFuture.complete(value);
+ metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+ metrics.addContainerOpsLatency(request.getCmdType(),
+ Time.monotonicNowNanos() - requestTime);
+ semaphore.release();
+ }
+ @Override
+ public void onError(Throwable t) {
+ replyFuture.completeExceptionally(t);
+ metrics.decrPendingContainerOpsMetrics(request.getCmdType());
+ metrics.addContainerOpsLatency(request.getCmdType(),
+ Time.monotonicNowNanos() - requestTime);
+ semaphore.release();
+ }
+
+ @Override
+ public void onCompleted() {
+ if (!replyFuture.isDone()) {
+ replyFuture.completeExceptionally(
+ new IOException("Stream completed but no reply for request "
+ + request));
+ }
+ }
+ });
+ requestObserver.onNext(request);
+ requestObserver.onCompleted();
+ return replyFuture;
+ }
+
+ /**
+ * Create a pipeline.
+ *
+ * @param pipelineID - Name of the pipeline.
+ * @param datanodes - Datanodes
+ */
+ @Override
+ public void createPipeline(String pipelineID, List<DatanodeDetails> datanodes)
+ throws IOException {
+ // For stand alone pipeline, there is no notion called setup pipeline.
+ return;
+ }
+
+ /**
+ * Returns pipeline Type.
+ *
+ * @return - Stand Alone as the type.
+ */
+ @Override
+ public HddsProtos.ReplicationType getPipelineType() {
+ return HddsProtos.ReplicationType.STAND_ALONE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index dcaa576..8919797 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -41,8 +41,6 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_DEFAULT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.SCM_CONTAINER_CLIENT_STALE_THRESHOLD_KEY;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
- .ReplicationType.RATIS;
/**
* XceiverClientManager is responsible for the lifecycle of XceiverClient
@@ -62,6 +60,7 @@ public class XceiverClientManager implements Closeable {
private final Configuration conf;
private final Cache<Long, XceiverClientSpi> clientCache;
private final boolean useRatis;
+ private final boolean useGrpc;
private static XceiverClientMetrics metrics;
/**
@@ -79,6 +78,8 @@ public class XceiverClientManager implements Closeable {
this.useRatis = conf.getBoolean(
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+ this.useGrpc = conf.getBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
this.conf = conf;
this.clientCache = CacheBuilder.newBuilder()
.expireAfterAccess(staleThresholdMs, TimeUnit.MILLISECONDS)
@@ -146,9 +147,19 @@ public class XceiverClientManager implements Closeable {
new Callable<XceiverClientSpi>() {
@Override
public XceiverClientSpi call() throws Exception {
- XceiverClientSpi client = pipeline.getType() == RATIS ?
- XceiverClientRatis.newXceiverClientRatis(pipeline, conf)
- : new XceiverClient(pipeline, conf);
+ XceiverClientSpi client = null;
+ switch (pipeline.getType()) {
+ case RATIS:
+ client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf);
+ break;
+ case STAND_ALONE:
+ client = useGrpc ? new XceiverClientGrpc(pipeline, conf) :
+ new XceiverClient(pipeline, conf);
+ break;
+ case CHAINED:
+ default:
+ throw new IOException ("not implemented" + pipeline.getType());
+ }
client.connect();
return client;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
index fbc348c..a430400 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java
@@ -49,13 +49,13 @@ public class XceiverClientMetrics {
this.containerOpsLatency = new MutableRate[numEnumEntries];
for (int i = 0; i < numEnumEntries; i++) {
pendingOpsArray[i] = registry.newCounter(
- "numPending" + ContainerProtos.Type.valueOf(i + 1),
- "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
+ "numPending" + ContainerProtos.Type.forNumber(i + 1),
+ "number of pending" + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);
containerOpsLatency[i] = registry.newRate(
- ContainerProtos.Type.valueOf(i + 1) + "Latency",
- "latency of " + ContainerProtos.Type.valueOf(i + 1)
+ ContainerProtos.Type.forNumber(i + 1) + "Latency",
+ "latency of " + ContainerProtos.Type.forNumber(i + 1)
+ " ops");
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
index 3571a89..daf6fec 100644
--- a/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdds/common/dev-support/findbugsExcludeFile.xml
@@ -18,4 +18,7 @@
<Match>
<Package name="org.apache.hadoop.hdds.protocol.proto"/>
</Match>
+ <Match>
+ <Package name="org.apache.hadoop.hdds.protocol.datanode.proto"/>
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml
index 6310df1..a8a634c 100644
--- a/hadoop-hdds/common/pom.xml
+++ b/hadoop-hdds/common/pom.xml
@@ -61,6 +61,12 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>ratis-grpc</artifactId>
<groupId>org.apache.ratis</groupId>
</dependency>
+ <dependency>
+ <groupId>com.google.errorprone</groupId>
+ <artifactId>error_prone_annotations</artifactId>
+ <version>2.2.0</version>
+ <optional>true</optional>
+ </dependency>
<dependency>
<groupId>org.rocksdb</groupId>
@@ -108,7 +114,15 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
+ <goal>compile-custom</goal>
+ <goal>test-compile-custom</goal>
</goals>
+ <configuration>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>
+ io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}
+ </pluginArtifact>
+ </configuration>
</execution>
</executions>
</plugin>
@@ -122,6 +136,9 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<replace token="com.google.protobuf" value="org.apache.ratis.shaded.com.google.protobuf"
dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
</replace>
+ <replace token="io.grpc" value="org.apache.ratis.shaded.io.grpc"
+ dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto">
+ </replace>
</tasks>
</configuration>
<goals>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 29ccf30..85407e6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -49,6 +49,10 @@ public final class ScmConfigKeys {
= "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT
= false;
+ public static final String DFS_CONTAINER_GRPC_ENABLED_KEY
+ = "dfs.container.grpc.enabled";
+ public static final boolean DFS_CONTAINER_GRPC_ENABLED_DEFAULT
+ = false;
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_KEY
= "dfs.container.ratis.rpc.type";
public static final String DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 95b7cbb..1138297 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -24,6 +24,7 @@
// This file contains protocol buffers that are used to transfer data
// to and from the datanode.
+syntax = "proto2";
option java_package = "org.apache.hadoop.hdds.protocol.datanode.proto";
option java_outer_classname = "ContainerProtos";
option java_generate_equals_and_hash = true;
@@ -418,3 +419,9 @@ message CopyContainerResponseProto {
repeated bytes data = 5;
optional int64 checksum = 6;
}
+
+service XceiverClientProtocolService {
+ // A client-to-datanode RPC to send container commands
+ rpc send(stream ContainerCommandRequestProto) returns
+ (stream ContainerCommandResponseProto) {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index e0aca67..7a91610 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -72,6 +72,15 @@
</description>
</property>
<property>
+ <name>dfs.container.grpc.enabled</name>
+ <value>false</value>
+ <tag>OZONE, MANAGEMENT, PIPELINE, RATIS</tag>
+ <description>Ozone supports different kinds of replication pipelines
+ protocols. grpc is one of the replication pipeline protocol supported by
+ ozone.
+ </description>
+ </property>
+ <property>
<name>dfs.container.ratis.ipc</name>
<value>9858</value>
<tag>OZONE, CONTAINER, PIPELINE, RATIS, MANAGEMENT</tag>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
index 4300b2d..714db59 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -63,20 +63,20 @@ public class ContainerMetrics {
this.registry = new MetricsRegistry("StorageContainerMetrics");
for (int i = 0; i < numEnumEntries; i++) {
numOpsArray[i] = registry.newCounter(
- "num" + ContainerProtos.Type.valueOf(i + 1),
- "number of " + ContainerProtos.Type.valueOf(i + 1) + " ops",
+ "num" + ContainerProtos.Type.forNumber(i + 1),
+ "number of " + ContainerProtos.Type.forNumber(i + 1) + " ops",
(long) 0);
opsBytesArray[i] = registry.newCounter(
- "bytes" + ContainerProtos.Type.valueOf(i + 1),
- "bytes used by " + ContainerProtos.Type.valueOf(i + 1) + "op",
+ "bytes" + ContainerProtos.Type.forNumber(i + 1),
+ "bytes used by " + ContainerProtos.Type.forNumber(i + 1) + "op",
(long) 0);
opsLatency[i] = registry.newRate(
- "latency" + ContainerProtos.Type.valueOf(i + 1),
- ContainerProtos.Type.valueOf(i + 1) + " op");
+ "latency" + ContainerProtos.Type.forNumber(i + 1),
+ ContainerProtos.Type.forNumber(i + 1) + " op");
for (int j = 0; j < len; j++) {
int interval = intervals[j];
- String quantileName = ContainerProtos.Type.valueOf(i + 1) + "Nanos"
+ String quantileName = ContainerProtos.Type.forNumber(i + 1) + "Nanos"
+ interval + "s";
opsLatQuantiles[i][j] = registry.newQuantiles(quantileName,
"latency of Container ops", "ops", "latency", interval);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
new file mode 100644
index 0000000..df6220c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto
+ .XceiverClientProtocolServiceGrpc;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.shaded.io.grpc.stub.StreamObserver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Grpc Service for handling Container Commands on datanode.
+ */
+public class GrpcXceiverService extends
+ XceiverClientProtocolServiceGrpc.XceiverClientProtocolServiceImplBase {
+ public static final Logger
+ LOG = LoggerFactory.getLogger(GrpcXceiverService.class);
+
+ private final ContainerDispatcher dispatcher;
+
+ public GrpcXceiverService(ContainerDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public StreamObserver<ContainerCommandRequestProto> send(
+ StreamObserver<ContainerCommandResponseProto> responseObserver) {
+ return new StreamObserver<ContainerCommandRequestProto>() {
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ @Override
+ public void onNext(ContainerCommandRequestProto request) {
+ try {
+ ContainerCommandResponseProto resp = dispatcher.dispatch(request);
+ responseObserver.onNext(resp);
+ } catch (Throwable e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{} got exception when processing"
+ + " ContainerCommandRequestProto {}: {}", request, e);
+ }
+ responseObserver.onError(e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ // for now we just log a msg
+ LOG.info("{}: ContainerCommand send on error. Exception: {}", t);
+ }
+
+ @Override
+ public void onCompleted() {
+ if (isClosed.compareAndSet(false, true)) {
+ LOG.info("{}: ContainerCommand send completed");
+ responseObserver.onCompleted();
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
new file mode 100644
index 0000000..30a2f87
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.ratis.shaded.io.grpc.Server;
+import org.apache.ratis.shaded.io.grpc.ServerBuilder;
+import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.SocketAddress;
+
+/**
+ * Creates a Grpc server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServerGrpc implements XceiverServerSpi {
+ private static final Logger
+ LOG = LoggerFactory.getLogger(XceiverServerGrpc.class);
+ private int port;
+ private Server server;
+
+ /**
+ * Constructs a Grpc server class.
+ *
+ * @param conf - Configuration
+ */
+ public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
+ ContainerDispatcher dispatcher) {
+ Preconditions.checkNotNull(conf);
+
+ this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+ // Get an available port on current node and
+ // use that as the container port
+ if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
+ try (ServerSocket socket = new ServerSocket()) {
+ socket.setReuseAddress(true);
+ SocketAddress address = new InetSocketAddress(0);
+ socket.bind(address);
+ this.port = socket.getLocalPort();
+ LOG.info("Found a free port for the server : {}", this.port);
+ } catch (IOException e) {
+ LOG.error("Unable find a random free port for the server, "
+ + "fallback to use default port {}", this.port, e);
+ }
+ }
+ datanodeDetails.setContainerPort(port);
+ server = ((NettyServerBuilder) ServerBuilder.forPort(port))
+ .maxMessageSize(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE)
+ .addService(new GrpcXceiverService(dispatcher))
+ .build();
+ }
+
+ @Override
+ public int getIPCPort() {
+ return this.port;
+ }
+
+ /**
+ * Returns the Replication type supported by this end-point.
+ *
+ * @return enum -- {Stand_Alone, Ratis, Grpc, Chained}
+ */
+ @Override
+ public HddsProtos.ReplicationType getServerType() {
+ return HddsProtos.ReplicationType.STAND_ALONE;
+ }
+
+ @Override
+ public void start() throws IOException {
+ server.start();
+ }
+
+ @Override
+ public void stop() {
+ server.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 1fc79d7..b497cdc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -40,6 +41,8 @@ import org.apache.hadoop.ozone.container.common.statemachine.background
.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server
+ .XceiverServerGrpc;
+import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.XceiverServerRatis;
@@ -121,8 +124,14 @@ public class OzoneContainer {
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
+ boolean useGrpc = this.ozoneConfig.getBoolean(
+ ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_DEFAULT);
server = new XceiverServerSpi[]{
- new XceiverServer(datanodeDetails, this.ozoneConfig, this.dispatcher),
+ useGrpc ? new XceiverServerGrpc(datanodeDetails,
+ this.ozoneConfig, this.dispatcher) :
+ new XceiverServer(datanodeDetails,
+ this.ozoneConfig, this.dispatcher),
XceiverServerRatis
.newXceiverServerRatis(datanodeDetails, this.ozoneConfig, dispatcher)
};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 08d7176..9936815 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -220,13 +220,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
datanodeService.stop();
datanodeService.join();
// ensure same ports are used across restarts.
- Configuration conf = datanodeService.getConf();
+ Configuration config = datanodeService.getConf();
int currentPort = datanodeService.getDatanodeDetails().getContainerPort();
- conf.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
- conf.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
+ config.setInt(DFS_CONTAINER_IPC_PORT, currentPort);
+ config.setBoolean(DFS_CONTAINER_IPC_RANDOM_PORT, false);
int ratisPort = datanodeService.getDatanodeDetails().getRatisPort();
- conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
- conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
+ config.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
+ config.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
datanodeService.start(null);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
index 07ad6ef..77e4e1b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.scm;
import com.google.common.cache.Cache;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -30,13 +31,17 @@ import org.apache.hadoop.hdds.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.junit.Assert;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
import static org.apache.hadoop.hdds.scm
.ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY;
@@ -44,19 +49,32 @@ import static org.apache.hadoop.hdds.scm
/**
* Test for XceiverClientManager caching and eviction.
*/
+@RunWith(Parameterized.class)
public class TestXceiverClientManager {
private static OzoneConfiguration config;
private static MiniOzoneCluster cluster;
private static StorageContainerLocationProtocolClientSideTranslatorPB
storageContainerLocationClient;
private static String containerOwner = "OZONE";
+ private static boolean shouldUseGrpc;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> withGrpc() {
+ return Arrays.asList(new Object[][] {{false}, {true}});
+ }
+
+ public TestXceiverClientManager(boolean useGrpc) {
+ shouldUseGrpc = useGrpc;
+ }
@Rule
public ExpectedException exception = ExpectedException.none();
- @BeforeClass
- public static void init() throws Exception {
+ @Before
+ public void init() throws Exception {
config = new OzoneConfiguration();
+ config.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ shouldUseGrpc);
cluster = MiniOzoneCluster.newBuilder(config)
.setNumDatanodes(3)
.build();
@@ -65,8 +83,8 @@ public class TestXceiverClientManager {
.getStorageContainerLocationClient();
}
- @AfterClass
- public static void shutdown() {
+ @After
+ public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
@@ -76,6 +94,8 @@ public class TestXceiverClientManager {
@Test
public void testCaching() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
ContainerInfo container1 = storageContainerLocationClient
@@ -106,6 +126,8 @@ public class TestXceiverClientManager {
public void testFreeByReference() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
+ conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
@@ -140,10 +162,18 @@ public class TestXceiverClientManager {
// After releasing the client, this connection should be closed
// and any container operations should fail
clientManager.releaseClient(client1);
- exception.expect(IOException.class);
- exception.expectMessage("This channel is not connected.");
- ContainerProtocolCalls.createContainer(client1,
- container1.getContainerID(), traceID1);
+
+ String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
+ "This channel is not connected.";
+ try {
+ ContainerProtocolCalls.createContainer(client1,
+ container1.getContainerID(), traceID1);
+ Assert.fail("Create container should throw exception on closed"
+ + "client");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains(expectedMessage));
+ }
clientManager.releaseClient(client2);
}
@@ -151,6 +181,8 @@ public class TestXceiverClientManager {
public void testFreeByEviction() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(SCM_CONTAINER_CLIENT_MAX_SIZE_KEY, 1);
+ conf.setBoolean(ScmConfigKeys.DFS_CONTAINER_GRPC_ENABLED_KEY,
+ shouldUseGrpc);
XceiverClientManager clientManager = new XceiverClientManager(conf);
Cache<Long, XceiverClientSpi> cache =
clientManager.getClientCache();
@@ -181,10 +213,17 @@ public class TestXceiverClientManager {
// Any container operation should now fail
String traceID2 = "trace" + RandomStringUtils.randomNumeric(4);
- exception.expect(IOException.class);
- exception.expectMessage("This channel is not connected.");
- ContainerProtocolCalls.createContainer(client1,
- container1.getContainerID(), traceID2);
+ String expectedMessage = shouldUseGrpc ? "Channel shutdown invoked" :
+ "This channel is not connected.";
+ try {
+ ContainerProtocolCalls.createContainer(client1,
+ container1.getContainerID(), traceID2);
+ Assert.fail("Create container should throw exception on closed"
+ + "client");
+ } catch (Exception e) {
+ Assert.assertEquals(e.getClass(), IOException.class);
+ Assert.assertTrue(e.getMessage().contains(expectedMessage));
+ }
clientManager.releaseClient(client2);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a914069/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index a916108..73c3f5b 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -107,6 +107,7 @@
<!-- Maven protoc compiler -->
<protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
<protobuf-compile.version>3.5.0</protobuf-compile.version>
+ <grpc.version>1.10.0</grpc.version>
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
<!-- define the Java language version used by the compiler -->
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org