You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/14 18:51:39 UTC

hadoop git commit: HDDS-389. Remove XceiverServer and XceiverClient and related classes. Contributed by chencan.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 446cb8301 -> c1df3084f


HDDS-389. Remove XceiverServer and XceiverClient and related classes. Contributed by chencan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c1df3084
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c1df3084
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c1df3084

Branch: refs/heads/trunk
Commit: c1df3084ffd062a41f05601b928430a0b1a0db47
Parents: 446cb83
Author: Nanda kumar <na...@apache.org>
Authored: Sat Sep 15 00:18:52 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Sat Sep 15 00:20:19 2018 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hdds/scm/XceiverClient.java   | 209 -------------------
 .../hadoop/hdds/scm/XceiverClientHandler.java   | 202 ------------------
 .../hdds/scm/XceiverClientInitializer.java      |  74 -------
 .../common/transport/server/XceiverServer.java  | 140 -------------
 .../transport/server/XceiverServerHandler.java  |  82 --------
 .../server/XceiverServerInitializer.java        |  64 ------
 .../hadoop/ozone/TestMiniOzoneCluster.java      |   4 +-
 .../container/metrics/TestContainerMetrics.java |  21 +-
 .../container/server/TestContainerServer.java   |  54 ++---
 9 files changed, 36 insertions(+), 814 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
deleted file mode 100644
index 5f2fe26..0000000
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.io.netty.bootstrap.Bootstrap;
-import org.apache.ratis.shaded.io.netty.channel.Channel;
-import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.ratis.shaded.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.ratis.shaded.io.netty.handler.logging.LogLevel;
-import org.apache.ratis.shaded.io.netty.handler.logging.LoggingHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-
-/**
- * A Client for the storageContainer protocol.
- */
-public class XceiverClient extends XceiverClientSpi {
-  static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
-  private final Pipeline pipeline;
-  private final Configuration config;
-  private Channel channel;
-  private Bootstrap b;
-  private EventLoopGroup group;
-  private final Semaphore semaphore;
-  private boolean closed = false;
-
-  /**
-   * Constructs a client that can communicate with the Container framework on
-   * data nodes.
-   *
-   * @param pipeline - Pipeline that defines the machines.
-   * @param config -- Ozone Config
-   */
-  public XceiverClient(Pipeline pipeline, Configuration config) {
-    super();
-    Preconditions.checkNotNull(pipeline);
-    Preconditions.checkNotNull(config);
-    this.pipeline = pipeline;
-    this.config = config;
-    this.semaphore =
-        new Semaphore(HddsClientUtils.getMaxOutstandingRequests(config));
-  }
-
-  @Override
-  public void connect() throws Exception {
-    if (closed) {
-      throw new IOException("This channel is not connected.");
-    }
-
-    if (channel != null && channel.isActive()) {
-      throw new IOException("This client is already connected to a host.");
-    }
-
-    group = new NioEventLoopGroup();
-    b = new Bootstrap();
-    b.group(group)
-        .channel(NioSocketChannel.class)
-        .handler(new LoggingHandler(LogLevel.INFO))
-        .handler(new XceiverClientInitializer(this.pipeline, semaphore));
-    DatanodeDetails leader = this.pipeline.getLeader();
-
-    // read port from the data node, on failure use default configured
-    // port.
-    int port = leader.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
-    if (port == 0) {
-      port = config.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
-          OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
-    }
-    LOG.debug("Connecting to server Port : " + port);
-    channel = b.connect(leader.getHostName(), port).sync().channel();
-  }
-
-  public void reconnect() throws IOException {
-    try {
-      connect();
-      if (channel == null || !channel.isActive()) {
-        throw new IOException("This channel is not connected.");
-      }
-    } catch (Exception e) {
-      LOG.error("Error while connecting: ", e);
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * Returns if the exceiver client connects to a server.
-   *
-   * @return True if the connection is alive, false otherwise.
-   */
-  @VisibleForTesting
-  public boolean isConnected() {
-    return channel.isActive();
-  }
-
-  @Override
-  public void close() {
-    closed = true;
-    if (group != null) {
-      group.shutdownGracefully().awaitUninterruptibly();
-    }
-  }
-
-  @Override
-  public Pipeline getPipeline() {
-    return pipeline;
-  }
-
-  @Override
-  public ContainerProtos.ContainerCommandResponseProto sendCommand(
-      ContainerProtos.ContainerCommandRequestProto request) throws IOException {
-    try {
-      if ((channel == null) || (!channel.isActive())) {
-        reconnect();
-      }
-      XceiverClientHandler handler =
-          channel.pipeline().get(XceiverClientHandler.class);
-
-      return handler.sendCommand(request);
-    } catch (ExecutionException | InterruptedException e) {
-      /**
-       * In case the netty channel handler throws an exception,
-       * the exception thrown will be wrapped within {@link ExecutionException}.
-       * Unwarpping here so that original exception gets passed
-       * to to the client.
-       */
-      if (e instanceof ExecutionException) {
-        Throwable cause = e.getCause();
-        if (cause instanceof IOException) {
-          throw (IOException) cause;
-        }
-      }
-      throw new IOException(
-          "Unexpected exception during execution:" + e.getMessage());
-    }
-  }
-
-  /**
-   * Sends a given command to server gets a waitable future back.
-   *
-   * @param request Request
-   * @return Response to the command
-   * @throws IOException
-   */
-  @Override
-  public CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
-      sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request)
-      throws IOException, ExecutionException, InterruptedException {
-    if ((channel == null) || (!channel.isActive())) {
-      reconnect();
-    }
-    XceiverClientHandler handler =
-        channel.pipeline().get(XceiverClientHandler.class);
-    return handler.sendCommandAsync(request);
-  }
-
-  /**
-   * Create a pipeline.
-   */
-  @Override
-  public void createPipeline()
-      throws IOException {
-    // For stand alone pipeline, there is no notion called setup pipeline.
-  }
-
-  public void destroyPipeline() {
-    // For stand alone pipeline, there is no notion called destroy pipeline.
-  }
-
-  /**
-   * Returns pipeline Type.
-   *
-   * @return - Stand Alone as the type.
-   */
-  @Override
-  public HddsProtos.ReplicationType getPipelineType() {
-    return HddsProtos.ReplicationType.STAND_ALONE;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
deleted file mode 100644
index 7c568f6..0000000
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientHandler.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm;
-
-import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.io.netty.channel.Channel;
-import org.apache.ratis.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.shaded.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-
-/**
- * Netty client handler.
- */
-public class XceiverClientHandler extends
-    SimpleChannelInboundHandler<ContainerCommandResponseProto> {
-
-  static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
-  private final ConcurrentMap<String, ResponseFuture> responses =
-      new ConcurrentHashMap<>();
-
-  private final Pipeline pipeline;
-  private volatile Channel channel;
-  private XceiverClientMetrics metrics;
-  private final Semaphore semaphore;
-
-  /**
-   * Constructs a client that can communicate to a container server.
-   */
-  public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) {
-    super(false);
-    Preconditions.checkNotNull(pipeline);
-    this.pipeline = pipeline;
-    this.metrics = XceiverClientManager.getXceiverClientMetrics();
-    this.semaphore = semaphore;
-  }
-
-  /**
-   * <strong>Please keep in mind that this method will be renamed to {@code
-   * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
-   * <p>
-   * Is called for each message of type {@link ContainerProtos
-   * .ContainerCommandResponseProto}.
-   *
-   * @param ctx the {@link ChannelHandlerContext} which this {@link
-   * SimpleChannelInboundHandler} belongs to
-   * @param msg the message to handle
-   * @throws Exception is thrown if an error occurred
-   */
-  @Override
-  public void channelRead0(ChannelHandlerContext ctx,
-      ContainerProtos.ContainerCommandResponseProto msg)
-      throws Exception {
-    Preconditions.checkNotNull(msg);
-    metrics.decrPendingContainerOpsMetrics(msg.getCmdType());
-
-    String key = msg.getTraceID();
-    ResponseFuture response = responses.remove(key);
-    semaphore.release();
-
-    if (response != null) {
-      response.getFuture().complete(msg);
-
-      long requestTime = response.getRequestTime();
-      metrics.addContainerOpsLatency(msg.getCmdType(),
-          Time.monotonicNowNanos() - requestTime);
-    } else {
-      LOG.error("A reply received for message that was not queued. trace " +
-          "ID: {}", msg.getTraceID());
-    }
-  }
-
-  @Override
-  public void channelRegistered(ChannelHandlerContext ctx) {
-    LOG.debug("channelRegistered: Connected to ctx");
-    channel = ctx.channel();
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    LOG.info("Exception in client " + cause.toString());
-    Iterator<String> keyIterator = responses.keySet().iterator();
-    while (keyIterator.hasNext()) {
-      ResponseFuture response = responses.remove(keyIterator.next());
-      response.getFuture().completeExceptionally(cause);
-      semaphore.release();
-    }
-    ctx.close();
-  }
-
-  /**
-   * Since netty is async, we send a work request and then wait until a response
-   * appears in the reply queue. This is simple sync interface for clients. we
-   * should consider building async interfaces for client if this turns out to
-   * be a performance bottleneck.
-   *
-   * @param request - request.
-   * @return -- response
-   */
-
-  public ContainerCommandResponseProto sendCommand(
-      ContainerProtos.ContainerCommandRequestProto request)
-      throws ExecutionException, InterruptedException {
-    Future<ContainerCommandResponseProto> future = sendCommandAsync(request);
-    return future.get();
-  }
-
-  /**
-   * SendCommandAsyc queues a command to the Netty Subsystem and returns a
-   * CompletableFuture. This Future is marked compeleted in the channelRead0
-   * when the call comes back.
-   * @param request - Request to execute
-   * @return CompletableFuture
-   */
-  public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
-      ContainerProtos.ContainerCommandRequestProto request)
-      throws InterruptedException {
-
-    // Throw an exception of request doesn't have traceId
-    if (StringUtils.isEmpty(request.getTraceID())) {
-      throw new IllegalArgumentException("Invalid trace ID");
-    }
-
-    // Setting the datanode ID in the commands, so that we can distinguish
-    // commands when the cluster simulator is running.
-    if(!request.hasDatanodeUuid()) {
-      throw new IllegalArgumentException("Invalid Datanode ID");
-    }
-
-    metrics.incrPendingContainerOpsMetrics(request.getCmdType());
-
-    CompletableFuture<ContainerCommandResponseProto> future
-        = new CompletableFuture<>();
-    ResponseFuture response = new ResponseFuture(future,
-        Time.monotonicNowNanos());
-    semaphore.acquire();
-    ResponseFuture previous = responses.putIfAbsent(
-        request.getTraceID(), response);
-    if (previous != null) {
-      LOG.error("Command with Trace already exists. Ignoring this command. " +
-              "{}. Previous Command: {}", request.getTraceID(),
-          previous.toString());
-      throw new IllegalStateException("Duplicate trace ID. Command with this " +
-          "trace ID is already executing. Please ensure that " +
-          "trace IDs are not reused. ID: " + request.getTraceID());
-    }
-
-    channel.writeAndFlush(request);
-    return response.getFuture();
-  }
-
-  /**
-   * Class wraps response future info.
-   */
-  static class ResponseFuture {
-    private final long requestTime;
-    private final CompletableFuture<ContainerCommandResponseProto> future;
-
-    ResponseFuture(CompletableFuture<ContainerCommandResponseProto> future,
-        long requestTime) {
-      this.future = future;
-      this.requestTime = requestTime;
-    }
-
-    public long getRequestTime() {
-      return requestTime;
-    }
-
-    public CompletableFuture<ContainerCommandResponseProto> getFuture() {
-      return future;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
deleted file mode 100644
index 90e2f5a..0000000
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientInitializer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm;
-
-import org.apache.ratis.shaded.io.netty.channel.ChannelInitializer;
-import org.apache.ratis.shaded.io.netty.channel.ChannelPipeline;
-import org.apache.ratis.shaded.io.netty.channel.socket.SocketChannel;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
-    .ProtobufVarint32FrameDecoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
-    .ProtobufVarint32LengthFieldPrepender;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-
-import java.util.concurrent.Semaphore;
-
-/**
- * Setup the netty pipeline.
- */
-public class XceiverClientInitializer extends
-    ChannelInitializer<SocketChannel> {
-  private final Pipeline pipeline;
-  private final Semaphore semaphore;
-
-  /**
-   * Constructs an Initializer for the client pipeline.
-   * @param pipeline  - Pipeline.
-   */
-  public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) {
-    this.pipeline = pipeline;
-    this.semaphore = semaphore;
-  }
-
-  /**
-   * This method will be called once when the Channel is registered. After
-   * the method returns this instance will be removed from the
-   * ChannelPipeline of the Channel.
-   *
-   * @param ch   Channel which was registered.
-   * @throws Exception is thrown if an error occurs. In that case the
-   *                   Channel will be closed.
-   */
-  @Override
-  protected void initChannel(SocketChannel ch) throws Exception {
-    ChannelPipeline p = ch.pipeline();
-
-    p.addLast(new ProtobufVarint32FrameDecoder());
-    p.addLast(new ProtobufDecoder(ContainerProtos
-        .ContainerCommandResponseProto.getDefaultInstance()));
-
-    p.addLast(new ProtobufVarint32LengthFieldPrepender());
-    p.addLast(new ProtobufEncoder());
-
-    p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore));
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
deleted file mode 100644
index f866fcd..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.ratis.shaded.io.netty.bootstrap.ServerBootstrap;
-import org.apache.ratis.shaded.io.netty.channel.Channel;
-import org.apache.ratis.shaded.io.netty.channel.EventLoopGroup;
-import org.apache.ratis.shaded.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.ratis.shaded.io.netty.channel.socket.nio
-    .NioServerSocketChannel;
-import org.apache.ratis.shaded.io.netty.handler.logging.LogLevel;
-import org.apache.ratis.shaded.io.netty.handler.logging.LoggingHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.net.SocketAddress;
-
-/**
- * Creates a netty server endpoint that acts as the communication layer for
- * Ozone containers.
- */
-public final class XceiverServer implements XceiverServerSpi {
-  private static final Logger
-      LOG = LoggerFactory.getLogger(XceiverServer.class);
-  private int port;
-  private final ContainerDispatcher storageContainer;
-
-  private EventLoopGroup bossGroup;
-  private EventLoopGroup workerGroup;
-  private Channel channel;
-
-  /**
-   * Constructs a netty server class.
-   *
-   * @param conf - Configuration
-   */
-  public XceiverServer(DatanodeDetails datanodeDetails, Configuration conf,
-                       ContainerDispatcher dispatcher) {
-    Preconditions.checkNotNull(conf);
-
-    this.port = conf.getInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
-    // Get an available port on current node and
-    // use that as the container port
-    if (conf.getBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT)) {
-      try (ServerSocket socket = new ServerSocket()) {
-        socket.setReuseAddress(true);
-        SocketAddress address = new InetSocketAddress(0);
-        socket.bind(address);
-        this.port = socket.getLocalPort();
-        LOG.info("Found a free port for the server : {}", this.port);
-      } catch (IOException e) {
-        LOG.error("Unable find a random free port for the server, "
-            + "fallback to use default port {}", this.port, e);
-      }
-    }
-    datanodeDetails.setPort(
-        DatanodeDetails.newPort(DatanodeDetails.Port.Name.STANDALONE, port));
-    this.storageContainer = dispatcher;
-  }
-
-  @Override
-  public int getIPCPort() {
-    return this.port;
-  }
-
-  /**
-   * Returns the Replication type supported by this end-point.
-   *
-   * @return enum -- {Stand_Alone, Ratis, Chained}
-   */
-  @Override
-  public HddsProtos.ReplicationType getServerType() {
-    return HddsProtos.ReplicationType.STAND_ALONE;
-  }
-
-  @Override
-  public void start() throws IOException {
-    bossGroup = new NioEventLoopGroup();
-    workerGroup = new NioEventLoopGroup();
-    channel = new ServerBootstrap()
-        .group(bossGroup, workerGroup)
-        .channel(NioServerSocketChannel.class)
-        .handler(new LoggingHandler(LogLevel.INFO))
-        .childHandler(new XceiverServerInitializer(storageContainer))
-        .bind(port)
-        .syncUninterruptibly()
-        .channel();
-  }
-
-  @Override
-  public void stop() {
-    if (storageContainer != null) {
-      storageContainer.shutdown();
-    }
-    if (bossGroup != null) {
-      bossGroup.shutdownGracefully();
-    }
-    if (workerGroup != null) {
-      workerGroup.shutdownGracefully();
-    }
-    if (channel != null) {
-      channel.close().awaitUninterruptibly();
-    }
-  }
-
-  @Override
-  public void submitRequest(ContainerCommandRequestProto request,
-      HddsProtos.PipelineID pipelineID) {
-    storageContainer.dispatch(request);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
deleted file mode 100644
index 3765299..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import org.apache.ratis.shaded.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.shaded.io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandResponseProto;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Netty server handlers that respond to Network events.
- */
-public class XceiverServerHandler extends
-    SimpleChannelInboundHandler<ContainerCommandRequestProto> {
-
-  static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class);
-  private final ContainerDispatcher dispatcher;
-
-  /**
-   * Constructor for server handler.
-   * @param dispatcher - Dispatcher interface
-   */
-  public XceiverServerHandler(ContainerDispatcher dispatcher) {
-    this.dispatcher = dispatcher;
-  }
-
-  /**
-   * <strong>Please keep in mind that this method will be renamed to {@code
-   * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
-   * <p>
-   * Is called for each message of type {@link ContainerCommandRequestProto}.
-   *
-   * @param ctx the {@link ChannelHandlerContext} which this {@link
-   *            SimpleChannelInboundHandler} belongs to
-   * @param msg the message to handle
-   * @throws Exception is thrown if an error occurred
-   */
-  @Override
-  public void channelRead0(ChannelHandlerContext ctx,
-                           ContainerCommandRequestProto msg) throws
-      Exception {
-    ContainerCommandResponseProto response = this.dispatcher.dispatch(msg);
-    LOG.debug("Writing the reponse back to client.");
-    ctx.writeAndFlush(response);
-
-  }
-
-  /**
-   * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
-   * Sub-classes may override this method to change behavior.
-   *
-   * @param ctx   - Channel Handler Context
-   * @param cause - Exception
-   */
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-      throws Exception {
-    LOG.error("An exception caught in the pipeline : " + cause.toString());
-    super.exceptionCaught(ctx, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
deleted file mode 100644
index e405cf9..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.transport.server;
-
-import com.google.common.base.Preconditions;
-import org.apache.ratis.shaded.io.netty.channel.ChannelInitializer;
-import org.apache.ratis.shaded.io.netty.channel.ChannelPipeline;
-import org.apache.ratis.shaded.io.netty.channel.socket.SocketChannel;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
-    .ProtobufVarint32FrameDecoder;
-import org.apache.ratis.shaded.io.netty.handler.codec.protobuf
-    .ProtobufVarint32LengthFieldPrepender;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-
-/**
- * Creates a channel for the XceiverServer.
- */
-public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{
-  private final ContainerDispatcher dispatcher;
-  public XceiverServerInitializer(ContainerDispatcher dispatcher) {
-    Preconditions.checkNotNull(dispatcher);
-    this.dispatcher = dispatcher;
-  }
-
-  /**
-   * This method will be called once the Channel is registered. After
-   * the method returns this instance will be removed from the {@link
-   * ChannelPipeline}
-   *
-   * @param ch the  which was registered.
-   * @throws Exception is thrown if an error occurs. In that case the channel
-   * will be closed.
-   */
-  @Override
-  protected void initChannel(SocketChannel ch) throws Exception {
-    ChannelPipeline pipeline = ch.pipeline();
-    pipeline.addLast(new ProtobufVarint32FrameDecoder());
-    pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto
-        .getDefaultInstance()));
-    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
-    pipeline.addLast(new ProtobufEncoder());
-    pipeline.addLast(new XceiverServerHandler(dispatcher));
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index e5bb373..302ea46 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
 import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.XceiverClient;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.test.PathUtils;
 import org.apache.hadoop.test.TestGenericTestUtils;
@@ -100,7 +100,7 @@ public class TestMiniOzoneCluster {
       pipeline.addMember(datanodeDetails);
 
       // Verify client is able to connect to the container
-      try (XceiverClient client = new XceiverClient(pipeline, conf)){
+      try (XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf)){
         client.connect();
         assertTrue(client.isConnected());
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index 19b561a..aac908d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -24,6 +24,7 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -37,11 +38,12 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.XceiverClient;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
+import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
@@ -54,10 +56,16 @@ import java.util.UUID;
  */
 public class TestContainerMetrics {
 
+  private GrpcReplicationService createReplicationService(
+      ContainerSet containerSet) {
+    return new GrpcReplicationService(
+        new OnDemandContainerReplicationSource(containerSet));
+  }
+
   @Test
   public void testContainerMetrics() throws Exception {
-    XceiverServer server = null;
-    XceiverClient client = null;
+    XceiverServerGrpc server = null;
+    XceiverClientGrpc client = null;
     long containerID = ContainerTestHelper.getTestContainerID();
     String path = GenericTestUtils.getRandomizedTempPath();
 
@@ -81,8 +89,9 @@ public class TestContainerMetrics {
           volumeSet, null);
       dispatcher.setScmId(UUID.randomUUID().toString());
 
-      server = new XceiverServer(datanodeDetails, conf, dispatcher);
-      client = new XceiverClient(pipeline, conf);
+      server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
+          createReplicationService(containerSet));
+      client = new XceiverClientGrpc(pipeline, conf);
 
       server.start();
       client.connect();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c1df3084/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index b89814e..de55d9e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
+import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
 import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -36,12 +38,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.RatisTestHelper;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerHandler;
+import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerGrpc;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.hdds.scm.XceiverClient;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
@@ -70,43 +71,24 @@ public class TestContainerServer {
   static final String TEST_DIR
       = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
 
-  @Test
-  public void testPipeline() throws IOException {
-    EmbeddedChannel channel = null;
-    String containerName = OzoneUtils.getRequestID();
-    try {
-      channel = new EmbeddedChannel(new XceiverServerHandler(
-          new TestContainerDispatcher()));
-      ContainerCommandRequestProto request =
-          ContainerTestHelper.getCreateContainerRequest(
-              ContainerTestHelper.getTestContainerID(),
-              ContainerTestHelper.createSingleNodePipeline());
-      channel.writeInbound(request);
-      Assert.assertTrue(channel.finish());
-
-      Object responseObject = channel.readOutbound();
-      Assert.assertTrue(responseObject instanceof
-          ContainerCommandResponseProto);
-      ContainerCommandResponseProto  response =
-          (ContainerCommandResponseProto) responseObject;
-      Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
-    } finally {
-      if (channel != null) {
-        channel.close();
-      }
-    }
+  private GrpcReplicationService createReplicationService(
+      ContainerSet containerSet) {
+    return new GrpcReplicationService(
+        new OnDemandContainerReplicationSource(containerSet));
   }
 
   @Test
   public void testClientServer() throws Exception {
     DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
+    ContainerSet containerSet = new ContainerSet();
     runTestClientServer(1,
         (pipeline, conf) -> conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
             pipeline.getLeader()
                 .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
-        XceiverClient::new,
-        (dn, conf) -> new XceiverServer(datanodeDetails, conf,
-            new TestContainerDispatcher()),
+        XceiverClientGrpc::new,
+        (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
+            new TestContainerDispatcher(),
+            createReplicationService(containerSet)),
         (dn, p) -> {});
   }
 
@@ -193,8 +175,8 @@ public class TestContainerServer {
 
   @Test
   public void testClientServerWithContainerDispatcher() throws Exception {
-    XceiverServer server = null;
-    XceiverClient client = null;
+    XceiverServerGrpc server = null;
+    XceiverClientGrpc client = null;
 
     try {
       Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
@@ -203,12 +185,14 @@ public class TestContainerServer {
           pipeline.getLeader()
               .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue());
 
+      ContainerSet containerSet = new ContainerSet();
       HddsDispatcher dispatcher = new HddsDispatcher(
           conf, mock(ContainerSet.class), mock(VolumeSet.class), null);
       dispatcher.init();
       DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
-      server = new XceiverServer(datanodeDetails, conf, dispatcher);
-      client = new XceiverClient(pipeline, conf);
+      server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
+          createReplicationService(containerSet));
+      client = new XceiverClientGrpc(pipeline, conf);
 
       server.start();
       client.connect();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org