You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/03/11 23:23:05 UTC
[47/50] [abbrv] hadoop git commit: HDFS-9891. Ozone: Add container
transport client. Contributed by Anu Engineer.
HDFS-9891. Ozone: Add container transport client. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/979dfe4c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/979dfe4c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/979dfe4c
Branch: refs/heads/HDFS-7240
Commit: 979dfe4c2ef5775dfbe8dcbcb07d3138a7257707
Parents: b31a5d6
Author: Chris Nauroth <cn...@apache.org>
Authored: Tue Mar 8 10:29:27 2016 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Fri Mar 11 12:57:09 2016 -0800
----------------------------------------------------------------------
.../apache/hadoop/hdfs/protocol/DatanodeID.java | 53 ++++++++
.../src/main/proto/hdfs.proto | 1 +
.../hadoop-hdfs/CHANGES-HDFS-7240.txt | 2 +
.../ozone/container/helpers/Pipeline.java | 132 +++++++++++++++++++
.../transport/client/XceiverClient.java | 122 +++++++++++++++++
.../transport/client/XceiverClientHandler.java | 112 ++++++++++++++++
.../client/XceiverClientInitializer.java | 68 ++++++++++
.../ozone/container/ContainerTestHelper.java | 103 +++++++++++++++
.../transport/server/TestContainerServer.java | 71 ++++++++--
9 files changed, 650 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index 5fd845d..30e946d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
/**
* This class represents the primary identifier for a Datanode.
@@ -49,6 +50,7 @@ public class DatanodeID implements Comparable<DatanodeID> {
private int infoSecurePort; // info server port
private int ipcPort; // IPC server port
private String xferAddr;
+ private int containerPort; // container server port.
/**
* UUID identifying a given datanode. For upgraded Datanodes this is the
@@ -274,4 +276,55 @@ public class DatanodeID implements Comparable<DatanodeID> {
public int compareTo(DatanodeID that) {
return getXferAddr().compareTo(that.getXferAddr());
}
+
+ /**
+ * Returns the container port.
+ * @return Port
+ */
+ public int getContainerPort() {
+ return containerPort;
+ }
+
+ /**
+ * Sets the container port.
+ * @param containerPort - container port.
+ */
+ public void setContainerPort(int containerPort) {
+ this.containerPort = containerPort;
+ }
+
+ /**
+ * Returns a DataNode ID from the protocol buffers.
+ *
+ * @param datanodeIDProto - protoBuf Message
+ * @return DataNodeID
+ */
+ public static DatanodeID getFromProtoBuf(HdfsProtos.DatanodeIDProto
+ datanodeIDProto) {
+ DatanodeID id = new DatanodeID(datanodeIDProto.getDatanodeUuid(),
+ datanodeIDProto.getIpAddr(), datanodeIDProto.getHostName(),
+ datanodeIDProto.getXferPort(), datanodeIDProto.getInfoPort(),
+ datanodeIDProto.getInfoSecurePort(), datanodeIDProto.getIpcPort());
+ id.setContainerPort(datanodeIDProto.getContainerPort());
+ return id;
+ }
+
+ /**
+ * Returns a DataNodeID protobuf message from a datanode ID.
+ * @return HdfsProtos.DatanodeIDProto
+ */
+ public HdfsProtos.DatanodeIDProto getProtoBufMessage() {
+ HdfsProtos.DatanodeIDProto.Builder builder =
+ HdfsProtos.DatanodeIDProto.newBuilder();
+
+ return builder.setDatanodeUuid(this.getDatanodeUuid())
+ .setIpAddr(this.getIpcAddr())
+ .setHostName(this.getHostName())
+ .setXferPort(this.getXferPort())
+ .setInfoPort(this.getInfoPort())
+ .setInfoSecurePort(this.getInfoSecurePort())
+ .setIpcPort(this.getIpcPort())
+ .setContainerPort(this.getContainerPort())
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 0db8a3f..f9b875c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -58,6 +58,7 @@ message DatanodeIDProto {
required uint32 infoPort = 5; // datanode http port
required uint32 ipcPort = 6; // ipc server port
optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
+ optional uint32 containerPort = 8 [default = 0]; // Ozone container protocol
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
index 30f28d2..9f4fcf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7240.txt
@@ -41,3 +41,5 @@
(Anu Engineer via cnauroth)
HDFS-9873. Ozone: Add container transport server (Anu Engineer via cnauroth)
+
+ HDFS-9891. Ozone: Add container transport client (Anu Engineer via cnauroth)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
new file mode 100644
index 0000000..d1bcc8d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A pipeline represents the group of machines over which a container lives.
+ */
+public class Pipeline {
+ private String containerName;
+ private String leaderID;
+ private Map<String, DatanodeID> datanodes;
+
+ /**
+ * Constructs a new pipeline data structure.
+ *
+ * @param leaderID - First machine in this pipeline.
+ */
+ public Pipeline(String leaderID) {
+ this.leaderID = leaderID;
+ datanodes = new TreeMap<>();
+ }
+
+ /**
+ * Gets pipeline object from protobuf.
+ *
+ * @param pipeline - ProtoBuf definition for the pipeline.
+ * @return Pipeline Object
+ */
+ public static Pipeline getFromProtoBuf(ContainerProtos.Pipeline pipeline) {
+ Preconditions.checkNotNull(pipeline);
+ Pipeline newPipeline = new Pipeline(pipeline.getLeaderID());
+ for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
+ newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
+ }
+ if (pipeline.hasContainerName()) {
+ newPipeline.containerName = newPipeline.getContainerName();
+ }
+ return newPipeline;
+ }
+
+ /** Adds a member to pipeline */
+
+ /**
+ * Adds a member to the pipeline.
+ *
+ * @param dataNodeId - Datanode to be added.
+ */
+ public void addMember(DatanodeID dataNodeId) {
+ datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId);
+ }
+
+ /**
+ * Returns the first machine in the set of datanodes.
+ *
+ * @return First Machine.
+ */
+ public DatanodeID getLeader() {
+ return datanodes.get(leaderID);
+ }
+
+ /**
+ * Returns all machines that make up this pipeline.
+ *
+ * @return List of Machines.
+ */
+ public List<DatanodeID> getMachines() {
+ return new ArrayList<>(datanodes.values());
+ }
+
+ /**
+ * Return a Protobuf Pipeline message from pipeline.
+ *
+ * @return Protobuf message
+ */
+ public ContainerProtos.Pipeline getProtobufMessage() {
+ ContainerProtos.Pipeline.Builder builder =
+ ContainerProtos.Pipeline.newBuilder();
+ for (DatanodeID datanode : datanodes.values()) {
+ builder.addMembers(datanode.getProtoBufMessage());
+ }
+ builder.setLeaderID(leaderID);
+ if (this.containerName != null) {
+ builder.setContainerName(this.containerName);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Returns containerName if available.
+ *
+ * @return String.
+ */
+ public String getContainerName() {
+ return containerName;
+ }
+
+ /**
+ * Sets the container Name.
+ *
+ * @param containerName - Name of the container.
+ */
+ public void setContainerName(String containerName) {
+ this.containerName = containerName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java
new file mode 100644
index 0000000..0c2686d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.transport.client;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A Client for the storageContainer protocol.
+ */
+public class XceiverClient {
+ static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
+ private final Pipeline pipeline;
+ private final OzoneConfiguration config;
+ private ChannelFuture channelFuture;
+ private Bootstrap b;
+ private EventLoopGroup group;
+
+ /**
+ * Constructs a client that can communicate with the Container framework on
+ * data nodes.
+ * @param pipeline - Pipeline that defines the machines.
+ * @param config -- Ozone Config
+ */
+ public XceiverClient(Pipeline pipeline, OzoneConfiguration config) {
+ Preconditions.checkNotNull(pipeline);
+ Preconditions.checkNotNull(config);
+ this.pipeline = pipeline;
+ this.config = config;
+ }
+
+ /**
+ * Connects to the leader in the pipeline.
+ */
+ public void connect() throws Exception {
+ if (channelFuture != null
+ && channelFuture.channel() != null
+ && channelFuture.channel().isActive()) {
+ throw new IOException("This client is already connected to a host.");
+ }
+
+ group = new NioEventLoopGroup();
+ b = new Bootstrap();
+ b.group(group)
+ .channel(NioSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .handler(new XceiverClientInitializer(this.pipeline));
+ DatanodeID leader = this.pipeline.getLeader();
+
+ // read port from the data node, on failure use default configured
+ // port.
+ int port = leader.getContainerPort();
+ if (port == 0) {
+ port = config.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT);
+ }
+ LOG.debug("Connecting to server Port : " + port);
+ channelFuture = b.connect(leader.getHostName(), port).sync();
+ }
+
+ /**
+ * Close the client.
+ */
+ public void close() {
+ if(group != null) {
+ group.shutdownGracefully();
+ }
+
+ if (channelFuture != null) {
+ channelFuture.channel().close();
+ }
+ }
+
+ /**
+ * Sends a given command to server and gets the reply back.
+ * @param request Request
+ * @return Response to the command
+ * @throws IOException
+ */
+ public ContainerProtos.ContainerCommandResponseProto sendCommand(
+ ContainerProtos.ContainerCommandRequestProto request)
+ throws IOException {
+ if((channelFuture == null) || (!channelFuture.channel().isActive())) {
+ throw new IOException("This channel is not connected.");
+ }
+ XceiverClientHandler handler =
+ channelFuture.channel().pipeline().get(XceiverClientHandler.class);
+
+ return handler.sendCommand(request);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java
new file mode 100644
index 0000000..25624f4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.transport.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Netty client handler.
+ */
+public class XceiverClientHandler extends
+ SimpleChannelInboundHandler<ContainerProtos.ContainerCommandResponseProto> {
+
+ static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
+ private final BlockingQueue<ContainerProtos.ContainerCommandResponseProto>
+ responses = new LinkedBlockingQueue<>();
+ private final Pipeline pipeline;
+ private volatile Channel channel;
+
+ /**
+ * Constructs a client that can communicate to a container server.
+ */
+ public XceiverClientHandler(Pipeline pipeline) {
+ super(false);
+ this.pipeline = pipeline;
+ }
+
+ /**
+ * <strong>Please keep in mind that this method will be renamed to {@code
+ * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
+ * <p>
+ * Is called for each message of type {@link ContainerProtos
+ * .ContainerCommandResponseProto}.
+ *
+ * @param ctx the {@link ChannelHandlerContext} which this {@link
+ * SimpleChannelInboundHandler} belongs to
+ * @param msg the message to handle
+ * @throws Exception is thrown if an error occurred
+ */
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx,
+ ContainerProtos.ContainerCommandResponseProto msg)
+ throws Exception {
+ responses.add(msg);
+ }
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) {
+ LOG.debug("channelRegistered: Connected to ctx");
+ channel = ctx.channel();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.info("Exception in client " + cause.toString());
+ ctx.close();
+ }
+
+ /**
+ * Since netty is async, we send a work request and then wait until a response
+ * appears in the reply queue. This is simple sync interface for clients. we
+ * should consider building async interfaces for client if this turns out to
+ * be a performance bottleneck.
+ *
+ * @param request - request.
+ * @return -- response
+ */
+ public ContainerProtos.ContainerCommandResponseProto
+ sendCommand(ContainerProtos.ContainerCommandRequestProto request) {
+
+ ContainerProtos.ContainerCommandResponseProto response;
+ channel.writeAndFlush(request);
+ boolean interrupted = false;
+ for (; ; ) {
+ try {
+ response = responses.take();
+ break;
+ } catch (InterruptedException ignore) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ return response;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java
new file mode 100644
index 0000000..6951f28
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/transport/client/XceiverClientInitializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.transport.client;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+
+/**
+ * Setup the netty pipeline.
+ */
+public class XceiverClientInitializer extends
+ ChannelInitializer<SocketChannel> {
+ private final Pipeline pipeline;
+
+ /**
+ * Constructs an Initializer for the client pipeline.
+ * @param pipeline - Pipeline.
+ */
+ public XceiverClientInitializer(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ /**
+ * This method will be called once when the Channel is registered. After
+ * the method returns this instance will be removed from the
+ * ChannelPipeline of the Channel.
+ *
+ * @param ch Channel which was registered.
+ * @throws Exception is thrown if an error occurs. In that case the
+ * Channel will be closed.
+ */
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+
+ p.addLast(new ProtobufVarint32FrameDecoder());
+ p.addLast(new ProtobufDecoder(ContainerProtos
+ .ContainerCommandResponseProto.getDefaultInstance()));
+
+ p.addLast(new ProtobufVarint32LengthFieldPrepender());
+ p.addLast(new ProtobufEncoder());
+
+ p.addLast(new XceiverClientHandler(this.pipeline));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
new file mode 100644
index 0000000..0622c82
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.UUID;
+
+/**
+ * Helpers for container tests.
+ */
+public class ContainerTestHelper {
+
+ /**
+ * Create a pipeline with single node replica.
+ *
+ * @return Pipeline with single node in it.
+ * @throws IOException
+ */
+ public static Pipeline createSingleNodePipeline() throws IOException {
+ ServerSocket socket = new ServerSocket(0);
+ int port = socket.getLocalPort();
+ DatanodeID datanodeID = new DatanodeID(socket.getInetAddress()
+ .getHostAddress(), socket.getInetAddress().getHostName(),
+ UUID.randomUUID().toString(), port, port, port, port);
+ datanodeID.setContainerPort(port);
+ Pipeline pipeline = new Pipeline(datanodeID.getDatanodeUuid());
+ pipeline.addMember(datanodeID);
+ socket.close();
+ return pipeline;
+ }
+
+ /**
+ * Returns a create container command for test purposes. There are a bunch of
+ * tests where we need to just send a request and get a reply.
+ *
+ * @return ContainerCommandRequestProto.
+ */
+ public static ContainerCommandRequestProto getCreateContainerRequest() throws
+ IOException {
+ ContainerProtos.CreateContainerRequestProto.Builder createRequest =
+ ContainerProtos.CreateContainerRequestProto
+ .newBuilder();
+ ContainerProtos.ContainerData.Builder containerData = ContainerProtos
+ .ContainerData.newBuilder();
+ containerData.setName("testContainer");
+ createRequest.setPipeline(
+ ContainerTestHelper.createSingleNodePipeline().getProtobufMessage());
+ createRequest.setContainerData(containerData.build());
+
+ ContainerCommandRequestProto.Builder request =
+ ContainerCommandRequestProto.newBuilder();
+ request.setCmdType(ContainerProtos.Type.CreateContainer);
+ request.setCreateContainer(createRequest);
+ return request.build();
+ }
+
+ /**
+ * Returns a create container response for test purposes. There are a bunch of
+ * tests where we need to just send a request and get a reply.
+ *
+ * @return ContainerCommandRequestProto.
+ */
+ public static ContainerCommandResponseProto
+ getCreateContainerResponse(ContainerCommandRequestProto request) throws
+ IOException {
+ ContainerProtos.CreateContainerResponseProto.Builder createResponse =
+ ContainerProtos.CreateContainerResponseProto.newBuilder();
+
+ ContainerCommandResponseProto.Builder response =
+ ContainerCommandResponseProto.newBuilder();
+ response.setCmdType(ContainerProtos.Type.CreateContainer);
+ response.setTraceID(request.getTraceID());
+ response.setCreateContainer(createResponse.build());
+ return response.build();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/979dfe4c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
index 37820eb..f546a12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
@@ -19,9 +19,16 @@
package org.apache.hadoop.ozone.container.transport.server;
import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.helpers.Pipeline;
import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.transport.client.XceiverClient;
import org.junit.Assert;
import org.junit.Test;
@@ -30,17 +37,53 @@ import java.io.IOException;
public class TestContainerServer {
@Test
- public void testPipeline() {
- EmbeddedChannel channel = new EmbeddedChannel(new XceiverServerHandler(
- new TestContainerDispatcher()));
- ContainerCommandRequestProto request = ContainerCommandRequestProto
- .getDefaultInstance();
- channel.writeInbound(request);
- Assert.assertTrue(channel.finish());
- ContainerCommandResponseProto response = channel.readOutbound();
- Assert.assertTrue(
- ContainerCommandResponseProto.getDefaultInstance().equals(response));
- channel.close();
+ public void testPipeline() throws IOException {
+ EmbeddedChannel channel = null;
+ try {
+ channel = new EmbeddedChannel(new XceiverServerHandler(
+ new TestContainerDispatcher()));
+ ContainerCommandRequestProto request =
+ ContainerTestHelper.getCreateContainerRequest();
+ channel.writeInbound(request);
+ Assert.assertTrue(channel.finish());
+ ContainerCommandResponseProto response = channel.readOutbound();
+ Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+ } finally {
+ if (channel != null) {
+ channel.close();
+ }
+ }
+ }
+
+ @Test
+ public void testClientServer() throws Exception {
+ XceiverServer server = null;
+ XceiverClient client = null;
+
+ try {
+ Pipeline pipeline = ContainerTestHelper.createSingleNodePipeline();
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
+ pipeline.getLeader().getContainerPort());
+
+ server = new XceiverServer(conf, new TestContainerDispatcher());
+ client = new XceiverClient(pipeline, conf);
+
+ server.start();
+ client.connect();
+
+ ContainerCommandRequestProto request =
+ ContainerTestHelper.getCreateContainerRequest();
+ ContainerCommandResponseProto response = client.sendCommand(request);
+ Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ if (server != null) {
+ server.stop();
+ }
+ }
}
private class TestContainerDispatcher implements ContainerDispatcher {
@@ -54,7 +97,7 @@ public class TestContainerServer {
@Override
public ContainerCommandResponseProto
dispatch(ContainerCommandRequestProto msg) throws IOException {
- return ContainerCommandResponseProto.getDefaultInstance();
+ return ContainerTestHelper.getCreateContainerResponse(msg);
}
}
}