You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/14 18:51:39 UTC
hadoop git commit: HDDS-389. Remove XceiverServer and XceiverClient
and related classes. Contributed by chencan.
Repository: hadoop
Updated Branches:
refs/heads/trunk 446cb8301 -> c1df3084f
HDDS-389. Remove XceiverServer and XceiverClient and related classes. Contributed by chencan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c1df3084
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c1df3084
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c1df3084
Branch: refs/heads/trunk
Commit: c1df3084ffd062a41f05601b928430a0b1a0db47
Parents: 446cb83
Author: Nanda kumar <na...@apache.org>
Authored: Sat Sep 15 00:18:52 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Sat Sep 15 00:20:19 2018 +0530
----------------------------------------------------------------------
.../apache/hadoop/hdds/scm/XceiverClient.java | 209 -------------------
.../hadoop/hdds/scm/XceiverClientHandler.java | 202 ------------------
.../hdds/scm/XceiverClientInitializer.java | 74 -------
.../common/transport/server/XceiverServer.java | 140 -------------
.../transport/server/XceiverServerHandler.java | 82 --------
.../server/XceiverServerInitializer.java | 64 ------
.../hadoop/ozone/TestMiniOzoneCluster.java | 4 +-
.../container/metrics/TestContainerMetrics.java | 21 +-
.../container/server/TestContainerServer.java | 54 ++---
9 files changed, 36 insertions(+), 814 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
deleted file mode 100644
index 5f2fe26..0000000
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.io.netty.bootstrap.Bootstrap;
-import org.apache.ratis.shaded.io.netty.channel.Channel;
-import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.ratis.shaded.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.ratis.shaded.io.netty.handler.logging.LogLevel;
-import org.apache.ratis.shaded.io.netty.handler.logging.LoggingHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-
-/**
- * A Client for the storageContainer protocol.
- */
-public class XceiverClient extends XceiverClientSpi {
- static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
- private final Pipeline pipeline;
- private final Configuration config;
- private Channel channel;
- private Bootstrap b;
- private EventLoopGroup group;
- private final Semaphore semaphore;
- private boolean closed = false;
-
- /**
- * Constructs a client that can communicate with the Container framework on
- * data nodes.
- *
- * @param pipeline - Pipeline that defines the machines.
- * @param config -- Ozone Config
- */
- public XceiverClient(Pipeline pipeline, Configuration config) {
- super();
- Preconditions.checkNotNull(pipeline);
- Preconditions.checkNotNull(config);
- this.pipeline = pipeline;
- this.config = config;
- this.semaphore =
- new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
- }
-
- @Override
- public void connect() throws Exception {
- if (closed) {
- throw new IOException("This channel is not connected.");
- }
-
- if (channel != null && channel.isActive()) {
- throw new IOException("This client is already connected to a host.");
- }
-
- group = new NioEventLoopGroup();
- b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .handler(new XceiverClientInitializer(this.pipeline, semaphore));
- DatanodeDetails leader = this.pipeline.getLeader();
-
- // read port from the data node, on failure use default configured
- // port.
- int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
- if (port == 0) {
- port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
- }
- LOG.debug("Connecting to server Port : " + port);
- channel = b.connect(leader.getHostName(), port).sync().channel();
- }
-
- public void reconnect() throws IOException {
- try {
- connect();
- if (channel == null || !channel.isActive()) {
- throw new IOException("This channel is not connected.");
- }
- } catch (Exception e) {
- LOG.error("Error while connecting: ", e);
- throw new IOException(e);
- }
- }
-
- /**
- * Returns if the exceiver client connects to a server.
- *
- * @return True if the connection is alive, false otherwise.
- */
- @VisibleForTesting
- public boolean isConnected() {
- return channel.isActive();
- }
-
- @Override
- public void close() {
- closed = true;
- if (group != null) {
- group.shutdownGracefully().awaitUninterruptibly();
- }
- }
-
- @Override
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- @Override
- public ContainerProtos.ContainerCommandResponseProto sendCommand(
- ContainerProtos.ContainerCommandRequestProto request) throws IOException {
- try {
- if ((channel == null) || (!channel.isActive())) {
- reconnect();
- }
- XceiverClientHandler handler =
- channel.pipeline().get(XceiverClientHandler.class);
-
- return handler.sendCommand(request);
- } catch (ExecutionException | InterruptedException e) {
- /**
- * In case the netty channel handler throws an exception,
- * the exception thrown will be wrapped within {@link ExecutionException}.
- * Unwarpping here so that original exception gets passed
- * to to the client.
- */
- if (e instanceof ExecutionException) {
- Throwable cause = e.getCause();
- if (cause instanceof IOException) {
- throw (IOException) cause;
- }
- }
- throw new IOException(
- "Unexpected exception during execution:" + e.getMessage());
- }
- }
-
- /**
- * Sends a given command to server gets a waitable future back.
- *
- * @param request Request
- * @return Response to the command
- * @throws IOException
- */
- @Override
- public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
- sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
- throws IOException, ExecutionException, InterruptedException {
- if ((channel == null) || (!channel.isActive())) {
- reconnect();
- }
- XceiverClientHandler handler =
- channel.pipeline().get(XceiverClientHandler.class);
- return handler.sendCommandAsync(request);
- }
-
- /**
- * Create a pipeline.
- */
- @Override
- public void createPipeline()
- throws IOException {
- // For stand alone pipeline, there is no notion called setup pipeline.
- }
-
- public void destroyPipeline() {
- // For stand alone pipeline, there is no notion called destroy pipeline.
- }
-
- /**
- * Returns pipeline Type.
- *
- * @return - Stand Alone as the type.
- */
- @Override
- public HddsProtos.ReplicationType getPipelineType() {
- return HddsProtos.ReplicationType.STAND_ALONE;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
deleted file mode 100644
index 7c568f6..0000000
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm;
-
-import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.io.netty.channel.Channel;
-import org.apache.ratis.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.shaded.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandResponseProto;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-
-/**
- * Netty client handler.
- */
-public class XceiverClientHandler extends
- SimpleChannelInboundHandler<ContainerCommandResponseProto> {
-
- static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
- private final ConcurrentMap<String, ResponseFuture> responses =
- new ConcurrentHashMap<>();
-
- private final Pipeline pipeline;
- private volatile Channel channel;
- private XceiverClientMetrics metrics;
- private final Semaphore semaphore;
-
- /**
- * Constructs a client that can communicate to a container server.
- */
- public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) {
- super(false);
- Preconditions.checkNotNull(pipeline);
- this.pipeline = pipeline;
- this.metrics = XceiverClientManager.getXceiverClientMetrics();
- this.semaphore = semaphore;
- }
-
- /**
- * <strong>Please keep in mind that this method will be renamed to {@code
- * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
- * <p>
- * Is called for each message of type {@link ContainerProtos
- * .ContainerCommandResponseProto}.
- *
- * @param ctx the {@link ChannelHandlerContext} which this {@link
- * SimpleChannelInboundHandler} belongs to
- * @param msg the message to handle
- * @throws Exception is thrown if an error occurred
- */
- @Override
- public void channelRead0(ChannelHandlerContext ctx,
- ContainerProtos.ContainerCommandResponseProto msg)
- throws Exception {
- Preconditions.checkNotNull(msg);
- metrics.decrPendingContainerOpsMetrics(msg.getCmdType());
-
- String key = msg.getTraceID();
- ResponseFuture response = responses.remove(key);
- semaphore.release();
-
- if (response != null) {
- response.getFuture().complete(msg);
-
- long requestTime = response.getRequestTime();
- metrics.addContainerOpsLatency(msg.getCmdType(),
- Time.monotonicNowNanos() - requestTime);
- } else {
- LOG.error("A reply received for message that was not queued. trace " +
- "ID: {}", msg.getTraceID());
- }
- }
-
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) {
- LOG.debug("channelRegistered: Connected to ctx");
- channel = ctx.channel();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- LOG.info("Exception in client " + cause.toString());
- Iterator<String> keyIterator = responses.keySet().iterator();
- while (keyIterator.hasNext()) {
- ResponseFuture response = responses.remove(keyIterator.next());
- response.getFuture().completeExceptionally(cause);
- semaphore.release();
- }
- ctx.close();
- }
-
- /**
- * Since netty is async, we send a work request and then wait until a response
- * appears in the reply queue. This is simple sync interface for clients. we
- * should consider building async interfaces for client if this turns out to
- * be a performance bottleneck.
- *
- * @param request - request.
- * @return -- response
- */
-
- public ContainerCommandResponseProto sendCommand(
- ContainerProtos.ContainerCommandRequestProto request)
- throws ExecutionException, InterruptedException {
- Future<ContainerCommandResponseProto> future = sendCommandAsync(request);
- return future.get();
- }
-
- /**
- * SendCommandAsyc queues a command to the Netty Subsystem and returns a
- * CompletableFuture. This Future is marked compeleted in the channelRead0
- * when the call comes back.
- * @param request - Request to execute
- * @return CompletableFuture
- */
- public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
- ContainerProtos.ContainerCommandRequestProto request)
- throws InterruptedException {
-
- // Throw an exception of request doesn't have traceId
- if (StringUtils.isEmpty(request.getTraceID())) {
- throw new IllegalArgumentException("Invalid trace ID");
- }
-
- // Setting the datanode ID in the commands, so that we can distinguish
- // commands when the cluster simulator is running.
- if(!request.hasDatanodeUuid()) {
- throw new IllegalArgumentException("Invalid Datanode ID");
- }
-
- metrics.incrPendingContainerOpsMetrics(request.getCmdType());
-
- CompletableFuture<ContainerCommandResponseProto> future
- = new CompletableFuture<>();
- ResponseFuture response = new ResponseFuture(future,
- Time.monotonicNowNanos());
- semaphore.acquire();
- ResponseFuture previous = responses.putIfAbsent(
- request.getTraceID(), response);
- if (previous != null) {
- LOG.error("Command with Trace already exists. Ignoring this command. " +
- "{}. Previous Command: {}", request.getTraceID(),
- previous.toString());
- throw new IllegalStateException("Duplicate trace ID. Command with this " +
- "trace ID is already executing. Please ensure that " +
- "trace IDs are not reused. ID: " + request.getTraceID());
- }
-
- channel.writeAndFlush(request);
- return response.getFuture();
- }
-
- /**
- * Class wraps response future info.
- */
- static class ResponseFuture {
- private final long requestTime;
- private final CompletableFuture<ContainerCommandResponseProto> future;
-
- ResponseFuture(CompletableFuture<ContainerCommandResponseProto> future,
- long requestTime) {
- this.future = future;
- this.requestTime = requestTime;
- }
-
- public long getRequestTime() {
- return requestTime;
- }
-
- public CompletableFuture<ContainerCommandResponseProto> getFuture() {
- return future;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
deleted file mode 100644
index 90e2f5a..0000000
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm;
-
-import org.apache.ratis.shaded.io.netty.channel.ChannelInitializer;
-import org.apache.ratis.shaded.io.netty.channel.ChannelPipeline;
-import org.apache.ratis.shaded.io.netty.channel.socket.SocketChannel;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
- .ProtobufVarint32FrameDecoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
- .ProtobufVarint32LengthFieldPrepender;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-
-import java.util.concurrent.Semaphore;
-
-/**
- * Setup the netty pipeline.
- */
-public class XceiverClientInitializer extends
- ChannelInitializer<SocketChannel> {
- private final Pipeline pipeline;
- private final Semaphore semaphore;
-
- /**
- * Constructs an Initializer for the client pipeline.
- * @param pipeline - Pipeline.
- */
- public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) {
- this.pipeline = pipeline;
- this.semaphore = semaphore;
- }
-
- /**
- * This method will be called once when the Channel is registered. After
- * the method returns this instance will be removed from the
- * ChannelPipeline of the Channel.
- *
- * @param ch Channel which was registered.
- * @throws Exception is thrown if an error occurs. In that case the
- * Channel will be closed.
- */
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
-
- p.addLast(new ProtobufVarint32FrameDecoder());
- p.addLast(new ProtobufDecoder(ContainerProtos
- .ContainerCommandResponseProto.getDefaultInstance()));
-
- p.addLast(new ProtobufVarint32LengthFieldPrepender());
- p.addLast(new ProtobufEncoder());
-
- p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore));
-
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
deleted file mode 100644
index f866fcd..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandRequestProto;
-import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap;
-import org.apache.ratis.shaded.io.netty.channel.Channel;
-import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.ratis.shaded.io.netty.channel.socket.nio
- .NioServerSocketChannel;
-import org.apache.ratis.shaded.io.netty.handler.logging.LogLevel;
-import org.apache.ratis.shaded.io.netty.handler.logging.LoggingHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
-
-/**
- * Creates a netty server endpoint that acts as the communication layer for
- * Ozone containers.
- */
-public final class XceiverServer implements XceiverServerSpi {
- private static final Logger
- LOG = LoggerFactory.getLogger(XceiverServer.class);
- private int port;
- private final ContainerDispatcher storageContainer;
-
- private EventLoopGroup bossGroup;
- private EventLoopGroup workerGroup;
- private Channel channel;
-
- /**
- * Constructs a netty server class.
- *
- * @param conf - Configuration
- */
- public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf,
- ContainerDispatcher dispatcher) {
- Preconditions.checkNotNull(conf);
-
- this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
- // Get an available port on current node and
- // use that as the container port
- if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
- OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
- try (ServerSocket socket = new ServerSocket()) {
- socket.setReuseAddress(true);
- SocketAddress address = new InetSocketAddress(0);
- socket.bind(address);
- this.port = socket.getLocalPort();
- LOG.info("Found a free port for the server : {}", this.port);
- } catch (IOException e) {
- LOG.error("Unable find a random free port for the server, "
- + "fallback to use default port {}", this.port, e);
- }
- }
- datanodeDetails.setPort(
- DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
- this.storageContainer = dispatcher;
- }
-
- @Override
- public int getIPCPort() {
- return this.port;
- }
-
- /**
- * Returns the Replication type supported by this end-point.
- *
- * @return enum -- {Stand_Alone, Ratis, Chained}
- */
- @Override
- public HddsProtos.ReplicationType getServerType() {
- return HddsProtos.ReplicationType.STAND_ALONE;
- }
-
- @Override
- public void start() throws IOException {
- bossGroup = new NioEventLoopGroup();
- workerGroup = new NioEventLoopGroup();
- channel = new ServerBootstrap()
- .group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .handler(new LoggingHandler(LogLevel.INFO))
- .childHandler(new XceiverServerInitializer(storageContainer))
- .bind(port)
- .syncUninterruptibly()
- .channel();
- }
-
- @Override
- public void stop() {
- if (storageContainer != null) {
- storageContainer.shutdown();
- }
- if (bossGroup != null) {
- bossGroup.shutdownGracefully();
- }
- if (workerGroup != null) {
- workerGroup.shutdownGracefully();
- }
- if (channel != null) {
- channel.close().awaitUninterruptibly();
- }
- }
-
- @Override
- public void submitRequest(ContainerCommandRequestProto request,
- HddsProtos.PipelineID pipelineID) {
- storageContainer.dispatch(request);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
deleted file mode 100644
index 3765299..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import org.apache.ratis.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.shaded.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandResponseProto;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Netty server handlers that respond to Network events.
- */
-public class XceiverServerHandler extends
- SimpleChannelInboundHandler<ContainerCommandRequestProto> {
-
- static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class);
- private final ContainerDispatcher dispatcher;
-
- /**
- * Constructor for server handler.
- * @param dispatcher - Dispatcher interface
- */
- public XceiverServerHandler(ContainerDispatcher dispatcher) {
- this.dispatcher = dispatcher;
- }
-
- /**
- * <strong>Please keep in mind that this method will be renamed to {@code
- * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
- * <p>
- * Is called for each message of type {@link ContainerCommandRequestProto}.
- *
- * @param ctx the {@link ChannelHandlerContext} which this {@link
- * SimpleChannelInboundHandler} belongs to
- * @param msg the message to handle
- * @throws Exception is thrown if an error occurred
- */
- @Override
- public void channelRead0(ChannelHandlerContext ctx,
- ContainerCommandRequestProto msg) throws
- Exception {
- ContainerCommandResponseProto response = this.dispatcher.dispatch(msg);
- LOG.debug("Writing the reponse back to client.");
- ctx.writeAndFlush(response);
-
- }
-
- /**
- * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
- * Sub-classes may override this method to change behavior.
- *
- * @param ctx - Channel Handler Context
- * @param cause - Exception
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
- throws Exception {
- LOG.error("An exception caught in the pipeline : " + cause.toString());
- super.exceptionCaught(ctx, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
deleted file mode 100644
index e405cf9..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.io.netty.channel.ChannelInitializer;
-import org.apache.ratis.shaded.io.netty.channel.ChannelPipeline;
-import org.apache.ratis.shaded.io.netty.channel.socket.SocketChannel;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
- .ProtobufVarint32FrameDecoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
- .ProtobufVarint32LengthFieldPrepender;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandRequestProto;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-
-/**
- * Creates a channel for the XceiverServer.
- */
-public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{
- private final ContainerDispatcher dispatcher;
- public XceiverServerInitializer(ContainerDispatcher dispatcher) {
- Preconditions.checkNotNull(dispatcher);
- this.dispatcher = dispatcher;
- }
-
- /**
- * This method will be called once the Channel is registered. After
- * the method returns this instance will be removed from the {@link
- * ChannelPipeline}
- *
- * @param ch the which was registered.
- * @throws Exception is thrown if an error occurs. In that case the channel
- * will be closed.
- */
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- pipeline.addLast(new ProtobufVarint32FrameDecoder());
- pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto
- .getDefaultInstance()));
- pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
- pipeline.addLast(new ProtobufEncoder());
- pipeline.addLast(new XceiverServerHandler(dispatcher));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index e5bb373..302ea46 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.XceiverClient;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.test.TestGenericTestUtils;
@@ -100,7 +100,7 @@ public class TestMiniOzoneCluster {
pipeline.addMember(datanodeDetails);
// Verify client is able to connect to the container
- try (XceiverClient client = new XceiverClient(pipeline, conf)){
+ try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){
client.connect();
assertTrue(client.isConnected());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index 19b561a..aac908d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -37,11 +38,12 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.XceiverClient;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
+import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -54,10 +56,16 @@ import java.util.UUID;
*/
public class TestContainerMetrics {
+ private GrpcReplicationService createReplicationService(
+ ContainerSet containerSet) {
+ return new GrpcReplicationService(
+ new OnDemandContainerReplicationSource(containerSet));
+ }
+
@Test
public void testContainerMetrics() throws Exception {
- XceiverServer server = null;
- XceiverClient client = null;
+ XceiverServerGrpc server = null;
+ XceiverClientGrpc client = null;
long containerID = ContainerTestHelper.getTestContainerID();
String path = GenericTestUtils.getRandomizedTempPath();
@@ -81,8 +89,9 @@ public class TestContainerMetrics {
volumeSet, null);
dispatcher.setScmId(UUID.randomUUID().toString());
- server = new XceiverServer(datanodeDetails, conf, dispatcher);
- client = new XceiverClient(pipeline, conf);
+ server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
+ createReplicationService(containerSet));
+ client = new XceiverClientGrpc(pipeline, conf);
server.start();
client.connect();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index b89814e..de55d9e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
+import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -36,12 +38,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.RatisTestHelper;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.hdds.scm.XceiverClient;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
@@ -70,43 +71,24 @@ public class TestContainerServer {
static final String TEST_DIR
= GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
- @Test
- public void testPipeline() throws IOException {
- EmbeddedChannel channel = null;
- String containerName = OzoneUtils.getRequestID();
- try {
- channel = new EmbeddedChannel(new XceiverServerHandler(
- new TestContainerDispatcher()));
- ContainerCommandRequestProto request =
- ContainerTestHelper.getCreateContainerRequest(
- ContainerTestHelper.getTestContainerID(),
- ContainerTestHelper.createSingleNodePipeline());
- channel.writeInbound(request);
- Assert.assertTrue(channel.finish());
-
- Object responseObject = channel.readOutbound();
- Assert.assertTrue(responseObject instanceof
- ContainerCommandResponseProto);
- ContainerCommandResponseProto response =
- (ContainerCommandResponseProto) responseObject;
- Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
- } finally {
- if (channel != null) {
- channel.close();
- }
- }
+ private GrpcReplicationService createReplicationService(
+ ContainerSet containerSet) {
+ return new GrpcReplicationService(
+ new OnDemandContainerReplicationSource(containerSet));
}
@Test
public void testClientServer() throws Exception {
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
+ ContainerSet containerSet = new ContainerSet();
runTestClientServer(1,
(pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
pipeline.getLeader()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
- XceiverClient::new,
- (dn, conf) -> new XceiverServer(datanodeDetails, conf,
- new TestContainerDispatcher()),
+ XceiverClientGrpc::new,
+ (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
+ new TestContainerDispatcher(),
+ createReplicationService(containerSet)),
(dn, p) -> {});
}
@@ -193,8 +175,8 @@ public class TestContainerServer {
@Test
public void testClientServerWithContainerDispatcher() throws Exception {
- XceiverServer server = null;
- XceiverClient client = null;
+ XceiverServerGrpc server = null;
+ XceiverClientGrpc client = null;
try {
Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
@@ -203,12 +185,14 @@ public class TestContainerServer {
pipeline.getLeader()
.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
+ ContainerSet containerSet = new ContainerSet();
HddsDispatcher dispatcher = new HddsDispatcher(
conf, mock(ContainerSet.class), mock(VolumeSet.class), null);
dispatcher.init();
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
- server = new XceiverServer(datanodeDetails, conf, dispatcher);
- client = new XceiverClient(pipeline, conf);
+ server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
+ createReplicationService(containerSet));
+ client = new XceiverClientGrpc(pipeline, conf);
server.start();
client.connect();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org