You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/03/08 06:49:25 UTC

hadoop git commit: HDFS-9873. Ozone: Add container transport server. Contributed by Anu Engineer.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 19e57d52f -> ea2689f52


HDFS-9873. Ozone: Add container transport server. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ea2689f5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ea2689f5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ea2689f5

Branch: refs/heads/HDFS-7240
Commit: ea2689f52727ef9a4d37c53111cadd7f25c9859e
Parents: 19e57d5
Author: Chris Nauroth <cn...@apache.org>
Authored: Mon Mar 7 21:47:28 2016 -0800
Committer: Chris Nauroth <cn...@apache.org>
Committed: Mon Mar 7 21:47:28 2016 -0800

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-7240.txt           |  2 +
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  3 +
 .../interfaces/ContainerDispatcher.java         | 44 ++++++++++
 .../transport/server/XceiverServer.java         | 92 ++++++++++++++++++++
 .../transport/server/XceiverServerHandler.java  | 80 +++++++++++++++++
 .../server/XceiverServerInitializer.java        | 61 +++++++++++++
 .../transport/server/TestContainerServer.java   | 60 +++++++++++++
 7 files changed, 342 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
index 667bc93..d88e52a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
@@ -47,3 +47,5 @@
 
     HDFS-9848. Ozone: Add Ozone Client lib for volume handling.
     (Anu Engineer via cnauroth)
+
+    HDFS-9873. Ozone: Add container transport server (Anu Engineer via cnauroth)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 7d9cdd5..ba24866 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public final class OzoneConfigKeys {
+  public static final String DFS_OZONE_CONTAINER_IPC_PORT =
+      "dfs.ozone.container.ipc";
+  public static final int DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT =  50011;
   public static final String DFS_STORAGE_LOCAL_ROOT =
       "dfs.ozone.localstorage.root";
   public static final String DFS_STORAGE_LOCAL_ROOT_DEFAULT = "/tmp/ozone";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java
new file mode 100644
index 0000000..f587b2a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.interfaces;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+
+import java.io.IOException;
+
+/**
+ * Dispatcher acts as the bridge between the transport layer and
+ * the actual container layer. This layer is capable of transforming
+ * protobuf objects into corresponding class and issue the function call
+ * into the lower layers.
+ *
+ * The reply from the request is dispatched to the client.
+ */
+public interface ContainerDispatcher {
+  /**
+   * Dispatches commands to container layer.
+   * @param msg - Command Request
+   * @return Command Response
+   * @throws IOException
+   */
+  ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java
new file mode 100644
index 0000000..66ffa93
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+
+/**
+ * Creates a netty server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServer {
+  private final int port;
+  private final ContainerDispatcher storageContainer;
+
+  private EventLoopGroup bossGroup;
+  private EventLoopGroup workerGroup;
+  private Channel channel;
+
+  /**
+   * Constructs a netty server class.
+   *
+   * @param conf - Configuration
+   */
+  public XceiverServer(OzoneConfiguration conf,
+                       ContainerDispatcher dispatcher) {
+    Preconditions.checkNotNull(conf);
+    this.port = conf.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
+        OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT);
+    this.storageContainer = dispatcher;
+  }
+
+  /**
+   * Starts running the server.
+   *
+   * @throws Exception
+   */
+  public void start() throws Exception {
+    bossGroup = new NioEventLoopGroup();
+    workerGroup = new NioEventLoopGroup();
+    channel = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(new XceiverServerInitializer(storageContainer))
+        .bind(port)
+        .syncUninterruptibly()
+        .channel();
+  }
+
+  /**
+   * Stops a running server.
+   *
+   * @throws Exception
+   */
+  public void stop() throws Exception {
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully();
+    }
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
+    }
+    if (channel != null) {
+      channel.close().awaitUninterruptibly();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java
new file mode 100644
index 0000000..887ad62
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+
+/**
+ * Netty server handlers that respond to Network events.
+ */
+public class XceiverServerHandler extends
+    SimpleChannelInboundHandler<ContainerCommandRequestProto> {
+
+  static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class);
+  private final ContainerDispatcher dispatcher;
+
+  /**
+   * Constructor for server handler.
+   * @param dispatcher - Dispatcher interface
+   */
+  public XceiverServerHandler(ContainerDispatcher dispatcher) {
+    this.dispatcher = dispatcher;
+  }
+
+  /**
+   * <strong>Please keep in mind that this method will be renamed to {@code
+   * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
+   * <p>
+   * Is called for each message of type {@link ContainerCommandRequestProto}.
+   *
+   * @param ctx the {@link ChannelHandlerContext} which this {@link
+   *            SimpleChannelInboundHandler} belongs to
+   * @param msg the message to handle
+   * @throws Exception is thrown if an error occurred
+   */
+  @Override
+  public void channelRead0(ChannelHandlerContext ctx,
+                           ContainerCommandRequestProto msg) throws
+      Exception {
+    ContainerCommandResponseProto response = this.dispatcher.dispatch(msg);
+    LOG.debug("Writing the reponse back to client.");
+    ctx.writeAndFlush(response);
+
+  }
+
+  /**
+   * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
+   * Sub-classes may override this method to change behavior.
+   *
+   * @param ctx   - Channel Handler Context
+   * @param cause - Exception
+   */
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+      throws Exception {
+    LOG.error("An exception caught in the pipeline : " + cause.toString());
+    super.exceptionCaught(ctx, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java
new file mode 100644
index 0000000..0e57855
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+
+/**
+ * Creates a channel for the XceiverServer.
+ */
+public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{
+  private final ContainerDispatcher dispatcher;
+  public XceiverServerInitializer(ContainerDispatcher dispatcher) {
+    Preconditions.checkNotNull(dispatcher);
+    this.dispatcher = dispatcher;
+  }
+
+  /**
+   * This method will be called once the Channel is registered. After
+   * the method returns this instance will be removed from the {@link
+   * ChannelPipeline}
+   *
+   * @param ch the  which was registered.
+   * @throws Exception is thrown if an error occurs. In that case the channel
+   * will be closed.
+   */
+  @Override
+  protected void initChannel(SocketChannel ch) throws Exception {
+    ChannelPipeline pipeline = ch.pipeline();
+    pipeline.addLast(new ProtobufVarint32FrameDecoder());
+    pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto
+        .getDefaultInstance()));
+    pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
+    pipeline.addLast(new ProtobufEncoder());
+    pipeline.addLast(new XceiverServerHandler(dispatcher));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
new file mode 100644
index 0000000..37820eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.server;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestContainerServer {
+
+  @Test
+  public void testPipeline() {
+    EmbeddedChannel channel = new EmbeddedChannel(new XceiverServerHandler(
+        new TestContainerDispatcher()));
+    ContainerCommandRequestProto request = ContainerCommandRequestProto
+        .getDefaultInstance();
+    channel.writeInbound(request);
+    Assert.assertTrue(channel.finish());
+    ContainerCommandResponseProto response = channel.readOutbound();
+    Assert.assertTrue(
+        ContainerCommandResponseProto.getDefaultInstance().equals(response));
+    channel.close();
+  }
+
+  private class TestContainerDispatcher implements ContainerDispatcher {
+    /**
+     * Dispatches commands to container layer.
+     *
+     * @param msg - Command Request
+     * @return Command Response
+     * @throws IOException
+     */
+    @Override
+    public ContainerCommandResponseProto
+    dispatch(ContainerCommandRequestProto msg) throws IOException {
+      return ContainerCommandResponseProto.getDefaultInstance();
+    }
+  }
+}