You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/03/08 06:49:25 UTC
hadoop git commit: HDFS-9873. Ozone: Add container transport server.
Contributed by Anu Engineer.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 19e57d52f -> ea2689f52
HDFS-9873. Ozone: Add container transport server. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ea2689f5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ea2689f5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ea2689f5
Branch: refs/heads/HDFS-7240
Commit: ea2689f52727ef9a4d37c53111cadd7f25c9859e
Parents: 19e57d5
Author: Chris Nauroth <cn...@apache.org>
Authored: Mon Mar 7 21:47:28 2016 -0800
Committer: Chris Nauroth <cn...@apache.org>
Committed: Mon Mar 7 21:47:28 2016 -0800
----------------------------------------------------------------------
.../hadoop-hdfs/CHANGES-HDFS-7240.txt | 2 +
.../apache/hadoop/ozone/OzoneConfigKeys.java | 3 +
.../interfaces/ContainerDispatcher.java | 44 ++++++++++
.../transport/server/XceiverServer.java | 92 ++++++++++++++++++++
.../transport/server/XceiverServerHandler.java | 80 +++++++++++++++++
.../server/XceiverServerInitializer.java | 61 +++++++++++++
.../transport/server/TestContainerServer.java | 60 +++++++++++++
7 files changed, 342 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
index 667bc93..d88e52a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
@@ -47,3 +47,5 @@
HDFS-9848. Ozone: Add Ozone Client lib for volume handling.
(Anu Engineer via cnauroth)
+
+ HDFS-9873. Ozone: Add container transport server (Anu Engineer via cnauroth)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 7d9cdd5..ba24866 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -25,6 +25,9 @@ import org.apache.hadoop.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
public final class OzoneConfigKeys {
+ public static final String DFS_OZONE_CONTAINER_IPC_PORT =
+ "dfs.ozone.container.ipc";
+ public static final int DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT = 50011;
public static final String DFS_STORAGE_LOCAL_ROOT =
"dfs.ozone.localstorage.root";
public static final String DFS_STORAGE_LOCAL_ROOT_DEFAULT = "/tmp/ozone";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java
new file mode 100644
index 0000000..f587b2a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.interfaces;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+
+import java.io.IOException;
+
+/**
+ * Dispatcher acts as the bridge between the transport layer and
+ * the actual container layer. This layer is capable of transforming
+ * protobuf objects into corresponding class and issue the function call
+ * into the lower layers.
+ *
+ * The reply from the request is dispatched to the client.
+ */
+public interface ContainerDispatcher {
+ /**
+ * Dispatches commands to container layer.
+ * @param msg - Command Request
+ * @return Command Response
+ * @throws IOException
+ */
+ ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java
new file mode 100644
index 0000000..66ffa93
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+
+/**
+ * Creates a netty server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServer {
+ private final int port;
+ private final ContainerDispatcher storageContainer;
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private Channel channel;
+
+ /**
+ * Constructs a netty server class.
+ *
+ * @param conf - Configuration
+ */
+ public XceiverServer(OzoneConfiguration conf,
+ ContainerDispatcher dispatcher) {
+ Preconditions.checkNotNull(conf);
+ this.port = conf.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT);
+ this.storageContainer = dispatcher;
+ }
+
+ /**
+ * Starts running the server.
+ *
+ * @throws Exception
+ */
+ public void start() throws Exception {
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
+ channel = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new XceiverServerInitializer(storageContainer))
+ .bind(port)
+ .syncUninterruptibly()
+ .channel();
+ }
+
+ /**
+ * Stops a running server.
+ *
+ * @throws Exception
+ */
+ public void stop() throws Exception {
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ if (channel != null) {
+ channel.close().awaitUninterruptibly();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java
new file mode 100644
index 0000000..887ad62
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+
+/**
+ * Netty server handlers that respond to Network events.
+ */
+public class XceiverServerHandler extends
+ SimpleChannelInboundHandler<ContainerCommandRequestProto> {
+
+ static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class);
+ private final ContainerDispatcher dispatcher;
+
+ /**
+ * Constructor for server handler.
+ * @param dispatcher - Dispatcher interface
+ */
+ public XceiverServerHandler(ContainerDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ /**
+ * <strong>Please keep in mind that this method will be renamed to {@code
+ * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
+ * <p>
+ * Is called for each message of type {@link ContainerCommandRequestProto}.
+ *
+ * @param ctx the {@link ChannelHandlerContext} which this {@link
+ * SimpleChannelInboundHandler} belongs to
+ * @param msg the message to handle
+ * @throws Exception is thrown if an error occurred
+ */
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx,
+ ContainerCommandRequestProto msg) throws
+ Exception {
+ ContainerCommandResponseProto response = this.dispatcher.dispatch(msg);
+ LOG.debug("Writing the reponse back to client.");
+ ctx.writeAndFlush(response);
+
+ }
+
+ /**
+ * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
+ * Sub-classes may override this method to change behavior.
+ *
+ * @param ctx - Channel Handler Context
+ * @param cause - Exception
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ LOG.error("An exception caught in the pipeline : " + cause.toString());
+ super.exceptionCaught(ctx, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java
new file mode 100644
index 0000000..0e57855
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/server/XceiverServerInitializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+
+/**
+ * Creates a channel for the XceiverServer.
+ */
+public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{
+ private final ContainerDispatcher dispatcher;
+ public XceiverServerInitializer(ContainerDispatcher dispatcher) {
+ Preconditions.checkNotNull(dispatcher);
+ this.dispatcher = dispatcher;
+ }
+
+ /**
+ * This method will be called once the Channel is registered. After
+ * the method returns this instance will be removed from the {@link
+ * ChannelPipeline}
+ *
+ * @param ch the which was registered.
+ * @throws Exception is thrown if an error occurs. In that case the channel
+ * will be closed.
+ */
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new ProtobufVarint32FrameDecoder());
+ pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto
+ .getDefaultInstance()));
+ pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
+ pipeline.addLast(new ProtobufEncoder());
+ pipeline.addLast(new XceiverServerHandler(dispatcher));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ea2689f5/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
new file mode 100644
index 0000000..37820eb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.server;
+
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestContainerServer {
+
+ @Test
+ public void testPipeline() {
+ EmbeddedChannel channel = new EmbeddedChannel(new XceiverServerHandler(
+ new TestContainerDispatcher()));
+ ContainerCommandRequestProto request = ContainerCommandRequestProto
+ .getDefaultInstance();
+ channel.writeInbound(request);
+ Assert.assertTrue(channel.finish());
+ ContainerCommandResponseProto response = channel.readOutbound();
+ Assert.assertTrue(
+ ContainerCommandResponseProto.getDefaultInstance().equals(response));
+ channel.close();
+ }
+
+ private class TestContainerDispatcher implements ContainerDispatcher {
+ /**
+ * Dispatches commands to container layer.
+ *
+ * @param msg - Command Request
+ * @return Command Response
+ * @throws IOException
+ */
+ @Override
+ public ContainerCommandResponseProto
+ dispatch(ContainerCommandRequestProto msg) throws IOException {
+ return ContainerCommandResponseProto.getDefaultInstance();
+ }
+ }
+}