You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/03/18 21:54:36 UTC
[2/2] hadoop git commit: HDFS-10180. Ozone: Refactor container
Namespace. Contributed by Anu Engineer.
HDFS-10180. Ozone: Refactor container Namespace. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c73a32c2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c73a32c2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c73a32c2
Branch: refs/heads/HDFS-7240
Commit: c73a32c21cad568915860cc200213d6c4c8408cf
Parents: 37e3a36
Author: Chris Nauroth <cn...@apache.org>
Authored: Fri Mar 18 13:46:56 2016 -0700
Committer: Chris Nauroth <cn...@apache.org>
Committed: Fri Mar 18 13:46:56 2016 -0700
----------------------------------------------------------------------
.../container/common/helpers/ContainerData.java | 170 +++++++++++++++++
.../common/helpers/ContainerUtils.java | 110 +++++++++++
.../container/common/helpers/Pipeline.java | 132 +++++++++++++
.../container/common/helpers/package-info.java | 21 +++
.../ozone/container/common/impl/Dispatcher.java | 189 +++++++++++++++++++
.../container/common/impl/package-info.java | 22 +++
.../common/interfaces/ContainerDispatcher.java | 44 +++++
.../common/interfaces/ContainerManager.java | 75 ++++++++
.../ozone/container/common/package-info.java | 28 +++
.../common/transport/client/XceiverClient.java | 122 ++++++++++++
.../transport/client/XceiverClientHandler.java | 112 +++++++++++
.../client/XceiverClientInitializer.java | 68 +++++++
.../common/transport/server/XceiverServer.java | 92 +++++++++
.../transport/server/XceiverServerHandler.java | 80 ++++++++
.../server/XceiverServerInitializer.java | 61 ++++++
.../ozone/container/helpers/ContainerData.java | 170 -----------------
.../ozone/container/helpers/ContainerUtils.java | 110 -----------
.../ozone/container/helpers/Pipeline.java | 132 -------------
.../ozone/container/helpers/package-info.java | 21 ---
.../interfaces/ContainerDispatcher.java | 44 -----
.../container/interfaces/ContainerManager.java | 75 --------
.../ozone/container/ozoneimpl/Dispatcher.java | 185 ------------------
.../ozone/container/ozoneimpl/package-info.java | 22 ---
.../transport/client/XceiverClient.java | 122 ------------
.../transport/client/XceiverClientHandler.java | 112 -----------
.../client/XceiverClientInitializer.java | 68 -------
.../transport/server/XceiverServer.java | 92 ---------
.../transport/server/XceiverServerHandler.java | 80 --------
.../server/XceiverServerInitializer.java | 61 ------
.../ozone/container/ContainerTestHelper.java | 2 +-
.../transport/server/TestContainerServer.java | 13 +-
31 files changed, 1335 insertions(+), 1300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
new file mode 100644
index 0000000..8f5120a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * This class maintains the information about a container in the ozone world.
+ * <p>
+ * A container is a name, along with metadata- which is a set of key value
+ * pair.
+ */
+public class ContainerData {
+
+ private final String containerName;
+ private final Map<String, String> metadata;
+
+ private String path;
+
+ /**
+ * Constructs a ContainerData Object.
+ *
+ * @param containerName - Name
+ */
+ public ContainerData(String containerName) {
+ this.metadata = new TreeMap<>();
+ this.containerName = containerName;
+ }
+
+ /**
+ * Constructs a ContainerData object from ProtoBuf classes.
+ *
+ * @param protoData - ProtoBuf Message
+ * @throws IOException
+ */
+ public static ContainerData getFromProtBuf(
+ ContainerProtos.ContainerData protoData) throws IOException {
+ ContainerData data = new ContainerData(protoData.getName());
+ for (int x = 0; x < protoData.getMetadataCount(); x++) {
+ data.addMetadata(protoData.getMetadata(x).getKey(),
+ protoData.getMetadata(x).getValue());
+ }
+
+ if (protoData.hasContainerPath()) {
+ data.setPath(protoData.getContainerPath());
+ }
+ return data;
+ }
+
+ /**
+ * Returns a ProtoBuf Message from ContainerData.
+ *
+ * @return Protocol Buffer Message
+ */
+ public ContainerProtos.ContainerData getProtoBufMessage() {
+ ContainerProtos.ContainerData.Builder builder = ContainerProtos
+ .ContainerData.newBuilder();
+ builder.setName(this.getContainerName());
+ if (this.getPath() != null) {
+ builder.setContainerPath(this.getPath());
+ }
+ for (Map.Entry<String, String> entry : metadata.entrySet()) {
+ ContainerProtos.KeyValue.Builder keyValBuilder =
+ ContainerProtos.KeyValue.newBuilder();
+ builder.addMetadata(keyValBuilder.setKey(entry.getKey())
+ .setValue(entry.getValue()).build());
+ }
+ return builder.build();
+ }
+
+ /**
+ * Returns the name of the container.
+ *
+ * @return - name
+ */
+ public String getContainerName() {
+ return containerName;
+ }
+
+ /**
+ * Adds metadata.
+ */
+ public void addMetadata(String key, String value) throws IOException {
+ synchronized (this.metadata) {
+ if (this.metadata.containsKey(key)) {
+ throw new IOException("This key already exists. Key " + key);
+ }
+ metadata.put(key, value);
+ }
+ }
+
+ /**
+ * Returns all metadata.
+ */
+ public Map<String, String> getAllMetadata() {
+ synchronized (this.metadata) {
+ return Collections.unmodifiableMap(this.metadata);
+ }
+ }
+
+ /**
+ * Returns value of a key.
+ */
+ public String getValue(String key) {
+ synchronized (this.metadata) {
+ return metadata.get(key);
+ }
+ }
+
+ /**
+ * Deletes a metadata entry from the map.
+ *
+ * @param key - Key
+ */
+ public void deleteKey(String key) {
+ synchronized (this.metadata) {
+ metadata.remove(key);
+ }
+ }
+
+ /**
+ * Returns path.
+ *
+ * @return - path
+ */
+ public String getPath() {
+ return path;
+ }
+
+ /**
+ * Sets path.
+ *
+ * @param path - String.
+ */
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ /**
+ * This function serves as the generic key for OzoneCache class. Both
+ * ContainerData and ContainerKeyData overrides this function to appropriately
+ * return the right name that can be used in OzoneCache.
+ *
+ * @return String Name.
+ */
+ public String getName() {
+ return getContainerName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
new file mode 100644
index 0000000..23e1804
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+
+/**
+ * A set of helper functions to create proper responses.
+ */
+public final class ContainerUtils {
+
+ /**
+ * Returns a CreateContainer Response. This call is used by create and delete
+ * containers which have null success responses.
+ *
+ * @param msg Request
+ * @return Response.
+ */
+ public static ContainerProtos.ContainerCommandResponseProto
+ getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg) {
+ ContainerProtos.ContainerCommandResponseProto.Builder builder =
+ getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
+ return builder.build();
+ }
+
+ /**
+ * Returns a ReadContainer Response.
+ *
+ * @param msg Request
+ * @return Response.
+ */
+ public static ContainerProtos.ContainerCommandResponseProto
+ getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
+ ContainerData containerData) {
+ Preconditions.checkNotNull(containerData);
+
+ ContainerProtos.ReadContainerResponseProto.Builder response =
+ ContainerProtos.ReadContainerResponseProto.newBuilder();
+ response.setContainerData(containerData.getProtoBufMessage());
+
+ ContainerProtos.ContainerCommandResponseProto.Builder builder =
+ getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
+ builder.setReadContainer(response);
+ return builder.build();
+ }
+
+ /**
+ * We found a command type but no associated payload for the command. Hence
+ * return malformed Command as response.
+ *
+ * @param msg - Protobuf message.
+ * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
+ */
+ public static ContainerProtos.ContainerCommandResponseProto.Builder
+ getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
+ ContainerProtos.Result result, String message) {
+ return
+ ContainerProtos.ContainerCommandResponseProto.newBuilder()
+ .setCmdType(msg.getCmdType())
+ .setTraceID(msg.getTraceID())
+ .setResult(result)
+ .setMessage(message);
+ }
+
+ /**
+ * We found a command type but no associated payload for the command. Hence
+ * return malformed Command as response.
+ *
+ * @param msg - Protobuf message.
+ * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
+ */
+ public static ContainerProtos.ContainerCommandResponseProto
+ malformedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
+ return getContainerResponse(msg, ContainerProtos.Result.MALFORMED_REQUEST,
+ "Cmd type does not match the payload.").build();
+ }
+
+ /**
+ * We found a command type that is not supported yet.
+ *
+ * @param msg - Protobuf message.
+ * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
+ */
+ public static ContainerProtos.ContainerCommandResponseProto
+ unsupportedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
+ return getContainerResponse(msg, ContainerProtos.Result.UNSUPPORTED_REQUEST,
+ "Server does not support this command yet.").build();
+ }
+
+ private ContainerUtils() {
+ //never constructed.
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java
new file mode 100644
index 0000000..140341c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/Pipeline.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * A pipeline represents the group of machines over which a container lives.
+ */
+public class Pipeline {
+ private String containerName;
+ private String leaderID;
+ private Map<String, DatanodeID> datanodes;
+
+ /**
+ * Constructs a new pipeline data structure.
+ *
+ * @param leaderID - First machine in this pipeline.
+ */
+ public Pipeline(String leaderID) {
+ this.leaderID = leaderID;
+ datanodes = new TreeMap<>();
+ }
+
+ /**
+ * Gets pipeline object from protobuf.
+ *
+ * @param pipeline - ProtoBuf definition for the pipeline.
+ * @return Pipeline Object
+ */
+ public static Pipeline getFromProtoBuf(ContainerProtos.Pipeline pipeline) {
+ Preconditions.checkNotNull(pipeline);
+ Pipeline newPipeline = new Pipeline(pipeline.getLeaderID());
+ for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
+ newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
+ }
+ if (pipeline.hasContainerName()) {
+ newPipeline.containerName = newPipeline.getContainerName();
+ }
+ return newPipeline;
+ }
+
+ /** Adds a member to pipeline */
+
+ /**
+ * Adds a member to the pipeline.
+ *
+ * @param dataNodeId - Datanode to be added.
+ */
+ public void addMember(DatanodeID dataNodeId) {
+ datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId);
+ }
+
+ /**
+ * Returns the first machine in the set of datanodes.
+ *
+ * @return First Machine.
+ */
+ public DatanodeID getLeader() {
+ return datanodes.get(leaderID);
+ }
+
+ /**
+ * Returns all machines that make up this pipeline.
+ *
+ * @return List of Machines.
+ */
+ public List<DatanodeID> getMachines() {
+ return new ArrayList<>(datanodes.values());
+ }
+
+ /**
+ * Return a Protobuf Pipeline message from pipeline.
+ *
+ * @return Protobuf message
+ */
+ public ContainerProtos.Pipeline getProtobufMessage() {
+ ContainerProtos.Pipeline.Builder builder =
+ ContainerProtos.Pipeline.newBuilder();
+ for (DatanodeID datanode : datanodes.values()) {
+ builder.addMembers(datanode.getProtoBufMessage());
+ }
+ builder.setLeaderID(leaderID);
+ if (this.containerName != null) {
+ builder.setContainerName(this.containerName);
+ }
+ return builder.build();
+ }
+
+ /**
+ * Returns containerName if available.
+ *
+ * @return String.
+ */
+ public String getContainerName() {
+ return containerName;
+ }
+
+ /**
+ * Sets the container Name.
+ *
+ * @param containerName - Name of the container.
+ */
+ public void setContainerName(String containerName) {
+ this.containerName = containerName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
new file mode 100644
index 0000000..fe7e37a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+/**
+ Contains protocol buffer helper classes.
+ **/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
new file mode 100644
index 0000000..7a45557
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.impl;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Ozone Container dispatcher takes a call from the netty server and routes it
+ * to the right handler function.
+ */
+public class Dispatcher implements ContainerDispatcher {
+ static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
+
+ private final ContainerManager containerManager;
+
+ /**
+ * Constructs an OzoneContainer that receives calls from
+ * XceiverServerHandler.
+ *
+ * @param containerManager - A class that manages containers.
+ */
+ public Dispatcher(ContainerManager containerManager) {
+ Preconditions.checkNotNull(containerManager);
+ this.containerManager = containerManager;
+ }
+
+ @Override
+ public ContainerCommandResponseProto dispatch(
+ ContainerCommandRequestProto msg) throws IOException {
+ Preconditions.checkNotNull(msg);
+ Type cmdType = msg.getCmdType();
+ if ((cmdType == Type.CreateContainer) ||
+ (cmdType == Type.DeleteContainer) ||
+ (cmdType == Type.ReadContainer) ||
+ (cmdType == Type.ListContainer) ||
+ (cmdType == Type.UpdateContainer)) {
+
+ return containerProcessHandler(msg);
+ }
+
+
+ return ContainerUtils.unsupportedRequest(msg);
+ }
+
+ /**
+ * Handles the all Container related functionality.
+ *
+ * @param msg - command
+ * @return - response
+ * @throws IOException
+ */
+ private ContainerCommandResponseProto containerProcessHandler(
+ ContainerCommandRequestProto msg) throws IOException {
+ try {
+ ContainerData cData = ContainerData.getFromProtBuf(
+ msg.getCreateContainer().getContainerData());
+
+ Pipeline pipeline = Pipeline.getFromProtoBuf(
+ msg.getCreateContainer().getPipeline());
+ Preconditions.checkNotNull(pipeline);
+
+ switch (msg.getCmdType()) {
+ case CreateContainer:
+ return handleCreateContainer(msg, cData, pipeline);
+
+ case DeleteContainer:
+ return handleDeleteContainer(msg, cData, pipeline);
+
+ case ListContainer:
+ return ContainerUtils.unsupportedRequest(msg);
+
+ case UpdateContainer:
+ return ContainerUtils.unsupportedRequest(msg);
+
+ case ReadContainer:
+ return handleReadContainer(msg, cData);
+
+ default:
+ return ContainerUtils.unsupportedRequest(msg);
+ }
+ } catch (IOException ex) {
+ LOG.warn("Container operation failed. " +
+ "Container: {} Operation: {} trace ID: {} Error: {}",
+ msg.getCreateContainer().getContainerData().getName(),
+ msg.getCmdType().name(),
+ msg.getTraceID(),
+ ex.toString());
+
+ // TODO : Replace with finer error codes.
+ return ContainerUtils.getContainerResponse(msg,
+ ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
+ ex.toString()).build();
+ }
+ }
+
+ /**
+ * Calls into container logic and returns appropriate response.
+ *
+ * @param msg - Request
+ * @param cData - Container Data object
+ * @return ContainerCommandResponseProto
+ * @throws IOException
+ */
+ private ContainerCommandResponseProto handleReadContainer(
+ ContainerCommandRequestProto msg, ContainerData cData)
+ throws IOException {
+
+ if (!msg.hasReadContainer()) {
+ LOG.debug("Malformed read container request. trace ID: {}",
+ msg.getTraceID());
+ return ContainerUtils.malformedRequest(msg);
+ }
+ ContainerData container = this.containerManager.readContainer(
+ cData.getContainerName());
+ return ContainerUtils.getReadContainerResponse(msg, container);
+ }
+
+ /**
+ * Calls into container logic and returns appropriate response.
+ *
+ * @param msg - Request
+ * @param cData - ContainerData
+ * @param pipeline - Pipeline is the machines where this container lives.
+ * @return Response.
+ * @throws IOException
+ */
+ private ContainerCommandResponseProto handleDeleteContainer(
+ ContainerCommandRequestProto msg, ContainerData cData,
+ Pipeline pipeline) throws IOException {
+ if (!msg.hasDeleteContainer()) {
+ LOG.debug("Malformed delete container request. trace ID: {}",
+ msg.getTraceID());
+ return ContainerUtils.malformedRequest(msg);
+ }
+ this.containerManager.deleteContainer(pipeline,
+ cData.getContainerName());
+ return ContainerUtils.getContainerResponse(msg);
+ }
+
+ /**
+ * Calls into container logic and returns appropriate response.
+ *
+ * @param msg - Request
+ * @param cData - ContainerData
+ * @param pipeline - Pipeline is the machines where this container lives.
+ * @return Response.
+ * @throws IOException
+ */
+ private ContainerCommandResponseProto handleCreateContainer(
+ ContainerCommandRequestProto msg, ContainerData cData,
+ Pipeline pipeline) throws IOException {
+ if (!msg.hasCreateContainer()) {
+ LOG.debug("Malformed create container request. trace ID: {}",
+ msg.getTraceID());
+ return ContainerUtils.malformedRequest(msg);
+ }
+ this.containerManager.createContainer(pipeline, cData);
+ return ContainerUtils.getContainerResponse(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java
new file mode 100644
index 0000000..16da5d9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.impl;
+
+/**
+ This package is contains Ozone container implementation.
+**/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
new file mode 100644
index 0000000..6ad8377
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+
+import java.io.IOException;
+
+/**
+ * Dispatcher acts as the bridge between the transport layer and
+ * the actual container layer. This layer is capable of transforming
+ * protobuf objects into corresponding class and issue the function call
+ * into the lower layers.
+ *
+ * The reply from the request is dispatched to the client.
+ */
+public interface ContainerDispatcher {
+ /**
+ * Dispatches commands to container layer.
+ * @param msg - Command Request
+ * @return Command Response
+ * @throws IOException
+ */
+ ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg)
+ throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
new file mode 100644
index 0000000..780d932
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.interfaces;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for container operations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ContainerManager {
+
+ /**
+ * Creates a container with the given name.
+ *
+ * @param pipeline -- Nodes which make up this container.
+ * @param containerData - Container Name and metadata.
+ * @throws IOException
+ */
+ void createContainer(Pipeline pipeline, ContainerData containerData)
+ throws IOException;
+
+ /**
+ * Deletes an existing container.
+ *
+ * @param pipeline - nodes that make this container.
+ * @param containerName - name of the container.
+ * @throws IOException
+ */
+ void deleteContainer(Pipeline pipeline, String containerName)
+ throws IOException;
+
+ /**
+ * As simple interface for container Iterations.
+ *
+ * @param start - Starting index
+ * @param count - how many to return
+ * @param data - Actual containerData
+ * @throws IOException
+ */
+ void listContainer(long start, long count, List<ContainerData> data)
+ throws IOException;
+
+ /**
+ * Get metadata about a specific container.
+ *
+ * @param containerName - Name of the container
+ * @return ContainerData
+ * @throws IOException
+ */
+ ContainerData readContainer(String containerName) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java
new file mode 100644
index 0000000..1638a36
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common;
+/**
+ Common Container Layer. At this layer the abstractions are:
+
+ 1. Containers - Both data and metadata containers.
+ 2. Keys - Key/Value pairs that live inside a container.
+ 3. Chunks - Keys can be composed of many chunks.
+
+ Ozone uses these abstractions to build Volumes, Buckets and Keys.
+
+ **/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java
new file mode 100644
index 0000000..05cd44a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClient.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.client;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A Client for the storageContainer protocol.
+ */
+public class XceiverClient {
+ static final Logger LOG = LoggerFactory.getLogger(XceiverClient.class);
+ private final Pipeline pipeline;
+ private final OzoneConfiguration config;
+ private ChannelFuture channelFuture;
+ private Bootstrap b;
+ private EventLoopGroup group;
+
+ /**
+ * Constructs a client that can communicate with the Container framework on
+ * data nodes.
+ * @param pipeline - Pipeline that defines the machines.
+ * @param config -- Ozone Config
+ */
+ public XceiverClient(Pipeline pipeline, OzoneConfiguration config) {
+ Preconditions.checkNotNull(pipeline);
+ Preconditions.checkNotNull(config);
+ this.pipeline = pipeline;
+ this.config = config;
+ }
+
+ /**
+ * Connects to the leader in the pipeline.
+ */
+ public void connect() throws Exception {
+ if (channelFuture != null
+ && channelFuture.channel() != null
+ && channelFuture.channel().isActive()) {
+ throw new IOException("This client is already connected to a host.");
+ }
+
+ group = new NioEventLoopGroup();
+ b = new Bootstrap();
+ b.group(group)
+ .channel(NioSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .handler(new XceiverClientInitializer(this.pipeline));
+ DatanodeID leader = this.pipeline.getLeader();
+
+ // read port from the data node, on failure use default configured
+ // port.
+ int port = leader.getContainerPort();
+ if (port == 0) {
+ port = config.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT);
+ }
+ LOG.debug("Connecting to server Port : " + port);
+ channelFuture = b.connect(leader.getHostName(), port).sync();
+ }
+
+ /**
+ * Close the client.
+ */
+ public void close() {
+ if(group != null) {
+ group.shutdownGracefully();
+ }
+
+ if (channelFuture != null) {
+ channelFuture.channel().close();
+ }
+ }
+
+ /**
+ * Sends a given command to server and gets the reply back.
+ * @param request Request
+ * @return Response to the command
+ * @throws IOException
+ */
+ public ContainerProtos.ContainerCommandResponseProto sendCommand(
+ ContainerProtos.ContainerCommandRequestProto request)
+ throws IOException {
+ if((channelFuture == null) || (!channelFuture.channel().isActive())) {
+ throw new IOException("This channel is not connected.");
+ }
+ XceiverClientHandler handler =
+ channelFuture.channel().pipeline().get(XceiverClientHandler.class);
+
+ return handler.sendCommand(request);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java
new file mode 100644
index 0000000..a219e4e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.client;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Netty client handler.
+ */
+public class XceiverClientHandler extends
+ SimpleChannelInboundHandler<ContainerProtos.ContainerCommandResponseProto> {
+
+ static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
+ private final BlockingQueue<ContainerProtos.ContainerCommandResponseProto>
+ responses = new LinkedBlockingQueue<>();
+ private final Pipeline pipeline;
+ private volatile Channel channel;
+
+ /**
+ * Constructs a client that can communicate to a container server.
+ */
+ public XceiverClientHandler(Pipeline pipeline) {
+ super(false);
+ this.pipeline = pipeline;
+ }
+
+ /**
+ * <strong>Please keep in mind that this method will be renamed to {@code
+ * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
+ * <p>
+ * Is called for each message of type {@link ContainerProtos
+ * .ContainerCommandResponseProto}.
+ *
+ * @param ctx the {@link ChannelHandlerContext} which this {@link
+ * SimpleChannelInboundHandler} belongs to
+ * @param msg the message to handle
+ * @throws Exception is thrown if an error occurred
+ */
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx,
+ ContainerProtos.ContainerCommandResponseProto msg)
+ throws Exception {
+ responses.add(msg);
+ }
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) {
+ LOG.debug("channelRegistered: Connected to ctx");
+ channel = ctx.channel();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ LOG.info("Exception in client " + cause.toString());
+ ctx.close();
+ }
+
+ /**
+ * Since netty is async, we send a work request and then wait until a response
+ * appears in the reply queue. This is simple sync interface for clients. we
+ * should consider building async interfaces for client if this turns out to
+ * be a performance bottleneck.
+ *
+ * @param request - request.
+ * @return -- response
+ */
+ public ContainerProtos.ContainerCommandResponseProto
+ sendCommand(ContainerProtos.ContainerCommandRequestProto request) {
+
+ ContainerProtos.ContainerCommandResponseProto response;
+ channel.writeAndFlush(request);
+ boolean interrupted = false;
+ for (; ; ) {
+ try {
+ response = responses.take();
+ break;
+ } catch (InterruptedException ignore) {
+ interrupted = true;
+ }
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ return response;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientInitializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientInitializer.java
new file mode 100644
index 0000000..cbf8ee9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/client/XceiverClientInitializer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.client;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.hadoop.ozone.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+
+/**
+ * Setup the netty pipeline.
+ */
+public class XceiverClientInitializer extends
+ ChannelInitializer<SocketChannel> {
+ private final Pipeline pipeline;
+
+ /**
+ * Constructs an Initializer for the client pipeline.
+ * @param pipeline - Pipeline.
+ */
+ public XceiverClientInitializer(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
+ /**
+ * This method will be called once when the Channel is registered. After
+ * the method returns this instance will be removed from the
+ * ChannelPipeline of the Channel.
+ *
+ * @param ch Channel which was registered.
+ * @throws Exception is thrown if an error occurs. In that case the
+ * Channel will be closed.
+ */
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+
+ p.addLast(new ProtobufVarint32FrameDecoder());
+ p.addLast(new ProtobufDecoder(ContainerProtos
+ .ContainerCommandResponseProto.getDefaultInstance()));
+
+ p.addLast(new ProtobufVarint32LengthFieldPrepender());
+ p.addLast(new ProtobufEncoder());
+
+ p.addLast(new XceiverClientHandler(this.pipeline));
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
new file mode 100644
index 0000000..77e4af1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+
+/**
+ * Creates a netty server endpoint that acts as the communication layer for
+ * Ozone containers.
+ */
+public final class XceiverServer {
+ private final int port;
+ private final ContainerDispatcher storageContainer;
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private Channel channel;
+
+ /**
+ * Constructs a netty server class.
+ *
+ * @param conf - Configuration
+ */
+ public XceiverServer(OzoneConfiguration conf,
+ ContainerDispatcher dispatcher) {
+ Preconditions.checkNotNull(conf);
+ this.port = conf.getInt(OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT,
+ OzoneConfigKeys.DFS_OZONE_CONTAINER_IPC_PORT_DEFAULT);
+ this.storageContainer = dispatcher;
+ }
+
+ /**
+ * Starts running the server.
+ *
+ * @throws Exception
+ */
+ public void start() throws Exception {
+ bossGroup = new NioEventLoopGroup();
+ workerGroup = new NioEventLoopGroup();
+ channel = new ServerBootstrap()
+ .group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .handler(new LoggingHandler(LogLevel.INFO))
+ .childHandler(new XceiverServerInitializer(storageContainer))
+ .bind(port)
+ .syncUninterruptibly()
+ .channel();
+ }
+
+ /**
+ * Stops a running server.
+ *
+ * @throws Exception
+ */
+ public void stop() throws Exception {
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ if (channel != null) {
+ channel.close().awaitUninterruptibly();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
new file mode 100644
index 0000000..c4a8f53
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerHandler.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+
+/**
+ * Netty server handlers that respond to Network events.
+ */
+public class XceiverServerHandler extends
+ SimpleChannelInboundHandler<ContainerCommandRequestProto> {
+
+ static final Logger LOG = LoggerFactory.getLogger(XceiverServerHandler.class);
+ private final ContainerDispatcher dispatcher;
+
+ /**
+ * Constructor for server handler.
+ * @param dispatcher - Dispatcher interface
+ */
+ public XceiverServerHandler(ContainerDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ /**
+ * <strong>Please keep in mind that this method will be renamed to {@code
+ * messageReceived(ChannelHandlerContext, I)} in 5.0.</strong>
+ * <p>
+ * Is called for each message of type {@link ContainerCommandRequestProto}.
+ *
+ * @param ctx the {@link ChannelHandlerContext} which this {@link
+ * SimpleChannelInboundHandler} belongs to
+ * @param msg the message to handle
+ * @throws Exception is thrown if an error occurred
+ */
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx,
+ ContainerCommandRequestProto msg) throws
+ Exception {
+ ContainerCommandResponseProto response = this.dispatcher.dispatch(msg);
+ LOG.debug("Writing the reponse back to client.");
+ ctx.writeAndFlush(response);
+
+ }
+
+ /**
+ * Calls {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
+ * Sub-classes may override this method to change behavior.
+ *
+ * @param ctx - Channel Handler Context
+ * @param cause - Exception
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ throws Exception {
+ LOG.error("An exception caught in the pipeline : " + cause.toString());
+ super.exceptionCaught(ctx, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
new file mode 100644
index 0000000..4d32d86
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerInitializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.transport.server;
+
+import com.google.common.base.Preconditions;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.protobuf.ProtobufDecoder;
+import io.netty.handler.codec.protobuf.ProtobufEncoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+
+/**
+ * Creates a channel for the XceiverServer.
+ */
+public class XceiverServerInitializer extends ChannelInitializer<SocketChannel>{
+ private final ContainerDispatcher dispatcher;
+ public XceiverServerInitializer(ContainerDispatcher dispatcher) {
+ Preconditions.checkNotNull(dispatcher);
+ this.dispatcher = dispatcher;
+ }
+
+ /**
+ * This method will be called once the Channel is registered. After
+ * the method returns this instance will be removed from the {@link
+ * ChannelPipeline}
+ *
+ * @param ch the which was registered.
+ * @throws Exception is thrown if an error occurs. In that case the channel
+ * will be closed.
+ */
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new ProtobufVarint32FrameDecoder());
+ pipeline.addLast(new ProtobufDecoder(ContainerCommandRequestProto
+ .getDefaultInstance()));
+ pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());
+ pipeline.addLast(new ProtobufEncoder());
+ pipeline.addLast(new XceiverServerHandler(dispatcher));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerData.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerData.java
deleted file mode 100644
index dd2d173..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerData.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.helpers;
-
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * This class maintains the information about a container in the ozone world.
- * <p>
- * A container is a name, along with metadata- which is a set of key value
- * pair.
- */
-public class ContainerData {
-
- private final String containerName;
- private final Map<String, String> metadata;
-
- private String path;
-
- /**
- * Constructs a ContainerData Object.
- *
- * @param containerName - Name
- */
- public ContainerData(String containerName) {
- this.metadata = new TreeMap<>();
- this.containerName = containerName;
- }
-
- /**
- * Constructs a ContainerData object from ProtoBuf classes.
- *
- * @param protoData - ProtoBuf Message
- * @throws IOException
- */
- public static ContainerData getFromProtBuf(
- ContainerProtos.ContainerData protoData) throws IOException {
- ContainerData data = new ContainerData(protoData.getName());
- for (int x = 0; x < protoData.getMetadataCount(); x++) {
- data.addMetadata(protoData.getMetadata(x).getKey(),
- protoData.getMetadata(x).getValue());
- }
-
- if (protoData.hasContainerPath()) {
- data.setPath(protoData.getContainerPath());
- }
- return data;
- }
-
- /**
- * Returns a ProtoBuf Message from ContainerData.
- *
- * @return Protocol Buffer Message
- */
- public ContainerProtos.ContainerData getProtoBufMessage() {
- ContainerProtos.ContainerData.Builder builder = ContainerProtos
- .ContainerData.newBuilder();
- builder.setName(this.getContainerName());
- if (this.getPath() != null) {
- builder.setContainerPath(this.getPath());
- }
- for (Map.Entry<String, String> entry : metadata.entrySet()) {
- ContainerProtos.KeyValue.Builder keyValBuilder =
- ContainerProtos.KeyValue.newBuilder();
- builder.addMetadata(keyValBuilder.setKey(entry.getKey())
- .setValue(entry.getValue()).build());
- }
- return builder.build();
- }
-
- /**
- * Returns the name of the container.
- *
- * @return - name
- */
- public String getContainerName() {
- return containerName;
- }
-
- /**
- * Adds metadata.
- */
- public void addMetadata(String key, String value) throws IOException {
- synchronized (this.metadata) {
- if (this.metadata.containsKey(key)) {
- throw new IOException("This key already exists. Key " + key);
- }
- metadata.put(key, value);
- }
- }
-
- /**
- * Returns all metadata.
- */
- public Map<String, String> getAllMetadata() {
- synchronized (this.metadata) {
- return Collections.unmodifiableMap(this.metadata);
- }
- }
-
- /**
- * Returns value of a key.
- */
- public String getValue(String key) {
- synchronized (this.metadata) {
- return metadata.get(key);
- }
- }
-
- /**
- * Deletes a metadata entry from the map.
- *
- * @param key - Key
- */
- public void deleteKey(String key) {
- synchronized (this.metadata) {
- metadata.remove(key);
- }
- }
-
- /**
- * Returns path.
- *
- * @return - path
- */
- public String getPath() {
- return path;
- }
-
- /**
- * Sets path.
- *
- * @param path - String.
- */
- public void setPath(String path) {
- this.path = path;
- }
-
- /**
- * This function serves as the generic key for OzoneCache class. Both
- * ContainerData and ContainerKeyData overrides this function to appropriately
- * return the right name that can be used in OzoneCache.
- *
- * @return String Name.
- */
- public String getName() {
- return getContainerName();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerUtils.java
deleted file mode 100644
index 6aef443..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/ContainerUtils.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-
-/**
- * A set of helper functions to create proper responses.
- */
-public final class ContainerUtils {
-
- /**
- * Returns a CreateContainer Response. This call is used by create and delete
- * containers which have null success responses.
- *
- * @param msg Request
- * @return Response.
- */
- public static ContainerProtos.ContainerCommandResponseProto
- getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg) {
- ContainerProtos.ContainerCommandResponseProto.Builder builder =
- getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
- return builder.build();
- }
-
- /**
- * Returns a ReadContainer Response.
- *
- * @param msg Request
- * @return Response.
- */
- public static ContainerProtos.ContainerCommandResponseProto
- getReadContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
- ContainerData containerData) {
- Preconditions.checkNotNull(containerData);
-
- ContainerProtos.ReadContainerResponseProto.Builder response =
- ContainerProtos.ReadContainerResponseProto.newBuilder();
- response.setContainerData(containerData.getProtoBufMessage());
-
- ContainerProtos.ContainerCommandResponseProto.Builder builder =
- getContainerResponse(msg, ContainerProtos.Result.SUCCESS, "");
- builder.setReadContainer(response);
- return builder.build();
- }
-
- /**
- * We found a command type but no associated payload for the command. Hence
- * return malformed Command as response.
- *
- * @param msg - Protobuf message.
- * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
- */
- public static ContainerProtos.ContainerCommandResponseProto.Builder
- getContainerResponse(ContainerProtos.ContainerCommandRequestProto msg,
- ContainerProtos.Result result, String message) {
- return
- ContainerProtos.ContainerCommandResponseProto.newBuilder()
- .setCmdType(msg.getCmdType())
- .setTraceID(msg.getTraceID())
- .setResult(result)
- .setMessage(message);
- }
-
- /**
- * We found a command type but no associated payload for the command. Hence
- * return malformed Command as response.
- *
- * @param msg - Protobuf message.
- * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
- */
- public static ContainerProtos.ContainerCommandResponseProto
- malformedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
- return getContainerResponse(msg, ContainerProtos.Result.MALFORMED_REQUEST,
- "Cmd type does not match the payload.").build();
- }
-
- /**
- * We found a command type that is not supported yet.
- *
- * @param msg - Protobuf message.
- * @return ContainerCommandResponseProto - MALFORMED_REQUEST.
- */
- public static ContainerProtos.ContainerCommandResponseProto
- unsupportedRequest(ContainerProtos.ContainerCommandRequestProto msg) {
- return getContainerResponse(msg, ContainerProtos.Result.UNSUPPORTED_REQUEST,
- "Server does not support this command yet.").build();
- }
-
- private ContainerUtils() {
- //never constructed.
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
deleted file mode 100644
index d1bcc8d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/Pipeline.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * A pipeline represents the group of machines over which a container lives.
- */
-public class Pipeline {
- private String containerName;
- private String leaderID;
- private Map<String, DatanodeID> datanodes;
-
- /**
- * Constructs a new pipeline data structure.
- *
- * @param leaderID - First machine in this pipeline.
- */
- public Pipeline(String leaderID) {
- this.leaderID = leaderID;
- datanodes = new TreeMap<>();
- }
-
- /**
- * Gets pipeline object from protobuf.
- *
- * @param pipeline - ProtoBuf definition for the pipeline.
- * @return Pipeline Object
- */
- public static Pipeline getFromProtoBuf(ContainerProtos.Pipeline pipeline) {
- Preconditions.checkNotNull(pipeline);
- Pipeline newPipeline = new Pipeline(pipeline.getLeaderID());
- for (HdfsProtos.DatanodeIDProto dataID : pipeline.getMembersList()) {
- newPipeline.addMember(DatanodeID.getFromProtoBuf(dataID));
- }
- if (pipeline.hasContainerName()) {
- newPipeline.containerName = newPipeline.getContainerName();
- }
- return newPipeline;
- }
-
- /** Adds a member to pipeline */
-
- /**
- * Adds a member to the pipeline.
- *
- * @param dataNodeId - Datanode to be added.
- */
- public void addMember(DatanodeID dataNodeId) {
- datanodes.put(dataNodeId.getDatanodeUuid(), dataNodeId);
- }
-
- /**
- * Returns the first machine in the set of datanodes.
- *
- * @return First Machine.
- */
- public DatanodeID getLeader() {
- return datanodes.get(leaderID);
- }
-
- /**
- * Returns all machines that make up this pipeline.
- *
- * @return List of Machines.
- */
- public List<DatanodeID> getMachines() {
- return new ArrayList<>(datanodes.values());
- }
-
- /**
- * Return a Protobuf Pipeline message from pipeline.
- *
- * @return Protobuf message
- */
- public ContainerProtos.Pipeline getProtobufMessage() {
- ContainerProtos.Pipeline.Builder builder =
- ContainerProtos.Pipeline.newBuilder();
- for (DatanodeID datanode : datanodes.values()) {
- builder.addMembers(datanode.getProtoBufMessage());
- }
- builder.setLeaderID(leaderID);
- if (this.containerName != null) {
- builder.setContainerName(this.containerName);
- }
- return builder.build();
- }
-
- /**
- * Returns containerName if available.
- *
- * @return String.
- */
- public String getContainerName() {
- return containerName;
- }
-
- /**
- * Sets the container Name.
- *
- * @param containerName - Name of the container.
- */
- public void setContainerName(String containerName) {
- this.containerName = containerName;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/package-info.java
deleted file mode 100644
index 15a4a28..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/helpers/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.helpers;
-/**
- Contains protocol buffer helper classes.
- **/
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java
deleted file mode 100644
index f587b2a..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerDispatcher.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.interfaces;
-
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-
-import java.io.IOException;
-
-/**
- * Dispatcher acts as the bridge between the transport layer and
- * the actual container layer. This layer is capable of transforming
- * protobuf objects into corresponding class and issue the function call
- * into the lower layers.
- *
- * The reply from the request is dispatched to the client.
- */
-public interface ContainerDispatcher {
- /**
- * Dispatches commands to container layer.
- * @param msg - Command Request
- * @return Command Response
- * @throws IOException
- */
- ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg)
- throws IOException;
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerManager.java
deleted file mode 100644
index f98544d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/interfaces/ContainerManager.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.interfaces;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ozone.container.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.helpers.Pipeline;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Interface for container operations.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public interface ContainerManager {
-
- /**
- * Creates a container with the given name.
- *
- * @param pipeline -- Nodes which make up this container.
- * @param containerData - Container Name and metadata.
- * @throws IOException
- */
- void createContainer(Pipeline pipeline, ContainerData containerData)
- throws IOException;
-
- /**
- * Deletes an existing container.
- *
- * @param pipeline - nodes that make this container.
- * @param containerName - name of the container.
- * @throws IOException
- */
- void deleteContainer(Pipeline pipeline, String containerName)
- throws IOException;
-
- /**
- * As simple interface for container Iterations.
- *
- * @param start - Starting index
- * @param count - how many to return
- * @param data - Actual containerData
- * @throws IOException
- */
- void listContainer(long start, long count, List<ContainerData> data)
- throws IOException;
-
- /**
- * Get metadata about a specific container.
- *
- * @param containerName - Name of the container
- * @return ContainerData
- * @throws IOException
- */
- ContainerData readContainer(String containerName) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c73a32c2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/Dispatcher.java
deleted file mode 100644
index 92aa241..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/Dispatcher.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.ozoneimpl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Type;
-import org.apache.hadoop.ozone.container.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.helpers.ContainerUtils;
-import org.apache.hadoop.ozone.container.helpers.Pipeline;
-import org.apache.hadoop.ozone.container.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.interfaces.ContainerManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Ozone Container dispatcher takes a call from the netty server and routes it
- * to the right handler function.
- */
-public class Dispatcher implements ContainerDispatcher {
- static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
-
- private final ContainerManager containerManager;
-
- /**
- * Constructs an OzoneContainer that receives calls from
- * XceiverServerHandler.
- *
- * @param containerManager - A class that manages containers.
- */
- public Dispatcher(ContainerManager containerManager) {
- Preconditions.checkNotNull(containerManager);
- this.containerManager = containerManager;
- }
-
- @Override
- public ContainerCommandResponseProto dispatch(
- ContainerCommandRequestProto msg) throws IOException {
- Preconditions.checkNotNull(msg);
- Type cmdType = msg.getCmdType();
- if ((cmdType == Type.CreateContainer) ||
- (cmdType == Type.DeleteContainer) ||
- (cmdType == Type.ReadContainer) ||
- (cmdType == Type.ListContainer)) {
-
- return containerProcessHandler(msg);
- }
-
-
- return ContainerUtils.unsupportedRequest(msg);
- }
-
- /**
- * Handles the all Container related functionality.
- *
- * @param msg - command
- * @return - response
- * @throws IOException
- */
- private ContainerCommandResponseProto containerProcessHandler(
- ContainerCommandRequestProto msg) throws IOException {
- try {
- ContainerData cData = ContainerData.getFromProtBuf(
- msg.getCreateContainer().getContainerData());
-
- Pipeline pipeline = Pipeline.getFromProtoBuf(
- msg.getCreateContainer().getPipeline());
- Preconditions.checkNotNull(pipeline);
-
- switch (msg.getCmdType()) {
- case CreateContainer:
- return handleCreateContainer(msg, cData, pipeline);
-
- case DeleteContainer:
- return handleDeleteContainer(msg, cData, pipeline);
-
- case ListContainer:
- return ContainerUtils.unsupportedRequest(msg);
-
- case ReadContainer:
- return handleReadContainer(msg, cData);
-
- default:
- return ContainerUtils.unsupportedRequest(msg);
- }
- } catch (IOException ex) {
- LOG.warn("Container operation failed. " +
- "Container: {} Operation: {} trace ID: {} Error: {}",
- msg.getCreateContainer().getContainerData().getName(),
- msg.getCmdType().name(),
- msg.getTraceID(),
- ex.toString());
-
- // TODO : Replace with finer error codes.
- return ContainerUtils.getContainerResponse(msg,
- ContainerProtos.Result.CONTAINER_INTERNAL_ERROR,
- ex.toString()).build();
- }
- }
-
- /**
- * Calls into container logic and returns appropriate response.
- *
- * @param msg - Request
- * @param cData - Container Data object
- * @return ContainerCommandResponseProto
- * @throws IOException
- */
- private ContainerCommandResponseProto handleReadContainer(
- ContainerCommandRequestProto msg, ContainerData cData)
- throws IOException {
-
- if (!msg.hasReadContainer()) {
- LOG.debug("Malformed read container request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- ContainerData container = this.containerManager.readContainer(
- cData.getContainerName());
- return ContainerUtils.getReadContainerResponse(msg, container);
- }
-
- /**
- * Calls into container logic and returns appropriate response.
- *
- * @param msg - Request
- * @param cData - ContainerData
- * @param pipeline - Pipeline is the machines where this container lives.
- * @return Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handleDeleteContainer(
- ContainerCommandRequestProto msg, ContainerData cData,
- Pipeline pipeline) throws IOException {
- if (!msg.hasDeleteContainer()) {
- LOG.debug("Malformed delete container request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- this.containerManager.deleteContainer(pipeline,
- cData.getContainerName());
- return ContainerUtils.getContainerResponse(msg);
- }
-
- /**
- * Calls into container logic and returns appropriate response.
- *
- * @param msg - Request
- * @param cData - ContainerData
- * @param pipeline - Pipeline is the machines where this container lives.
- * @return Response.
- * @throws IOException
- */
- private ContainerCommandResponseProto handleCreateContainer(
- ContainerCommandRequestProto msg, ContainerData cData,
- Pipeline pipeline) throws IOException {
- if (!msg.hasCreateContainer()) {
- LOG.debug("Malformed create container request. trace ID: {}",
- msg.getTraceID());
- return ContainerUtils.malformedRequest(msg);
- }
- this.containerManager.createContainer(pipeline, cData);
- return ContainerUtils.getContainerResponse(msg);
- }
-}