You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/08/18 02:47:48 UTC
[3/3] hadoop git commit: HDFS-12159. Ozone: SCM: Add create
replication pipeline RPC. Contributed by Anu Engineer.
HDFS-12159. Ozone: SCM: Add create replication pipeline RPC. Contributed by Anu Engineer.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a245c60b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a245c60b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a245c60b
Branch: refs/heads/HDFS-7240
Commit: a245c60bb0eb7e5247d90b67c8947ab552af1e9f
Parents: 6c1e9ab
Author: Anu Engineer <ae...@apache.org>
Authored: Thu Aug 17 19:38:26 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Thu Aug 17 19:38:26 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdfs/protocol/DatanodeID.java | 23 ++-
.../hadoop/hdfs/protocolPB/PBHelperClient.java | 15 --
.../apache/hadoop/ozone/OzoneConfigKeys.java | 16 ++
.../org/apache/hadoop/ozone/OzoneConsts.java | 2 +
.../apache/hadoop/scm/XceiverClientManager.java | 33 ++++
.../scm/client/ContainerOperationClient.java | 38 ++--
.../org/apache/hadoop/scm/client/ScmClient.java | 50 ++---
.../StorageContainerLocationProtocol.java | 59 +++---
...rLocationProtocolClientSideTranslatorPB.java | 72 ++++---
.../main/java/org/apache/ratis/RatisHelper.java | 2 +-
.../src/main/proto/Ozone.proto | 20 ++
.../StorageContainerLocationProtocol.proto | 47 ++++-
.../src/main/proto/hdfs.proto | 3 +-
.../hadoop/cblock/storage/StorageManager.java | 8 +-
.../statemachine/DatanodeStateMachine.java | 8 +-
.../common/statemachine/StateContext.java | 12 +-
.../states/datanode/InitDatanodeState.java | 2 +
.../common/transport/server/XceiverServer.java | 11 ++
.../transport/server/XceiverServerSpi.java | 9 +
.../server/ratis/XceiverServerRatis.java | 103 +++++++---
.../container/ozoneimpl/OzoneContainer.java | 49 +++--
...rLocationProtocolServerSideTranslatorPB.java | 39 ++--
.../ozone/scm/StorageContainerManager.java | 89 ++++-----
.../ozone/scm/block/BlockManagerImpl.java | 8 +-
.../ozone/scm/container/ContainerMapping.java | 115 ++---------
.../hadoop/ozone/scm/container/Mapping.java | 15 +-
.../ozone/scm/pipelines/PipelineManager.java | 69 +++++++
.../ozone/scm/pipelines/PipelineSelector.java | 198 +++++++++++++++++++
.../ozone/scm/pipelines/package-info.java | 38 ++++
.../scm/pipelines/ratis/RatisManagerImpl.java | 113 +++++++++++
.../ozone/scm/pipelines/ratis/package-info.java | 18 ++
.../standalone/StandaloneManagerImpl.java | 139 +++++++++++++
.../scm/pipelines/standalone/package-info.java | 18 ++
.../hadoop/ozone/scm/ratis/RatisManager.java | 59 ------
.../ozone/scm/ratis/RatisManagerImpl.java | 194 ------------------
.../src/main/resources/ozone-default.xml | 17 ++
.../apache/hadoop/cblock/TestBufferManager.java | 4 +-
.../hadoop/cblock/TestCBlockReadWrite.java | 4 +-
.../hadoop/cblock/TestLocalBlockCache.java | 4 +-
.../hadoop/cblock/util/MockStorageClient.java | 20 +-
.../namenode/TestFavoredNodesEndToEnd.java | 6 +-
.../apache/hadoop/ozone/MiniOzoneCluster.java | 30 +--
.../apache/hadoop/ozone/RatisTestHelper.java | 10 +-
.../hadoop/ozone/TestContainerOperations.java | 5 +-
.../hadoop/ozone/TestMiniOzoneCluster.java | 34 +++-
.../ozone/TestStorageContainerManager.java | 16 +-
.../common/TestDatanodeStateMachine.java | 11 +-
.../ozone/container/common/TestEndPoint.java | 15 +-
.../container/ozoneimpl/TestOzoneContainer.java | 4 +-
.../ozoneimpl/TestOzoneContainerRatis.java | 67 ++++---
.../container/ozoneimpl/TestRatisManager.java | 20 +-
.../placement/TestContainerPlacement.java | 3 +-
.../transport/server/TestContainerServer.java | 10 +-
.../hadoop/ozone/scm/TestAllocateContainer.java | 19 +-
.../ozone/scm/TestContainerSmallFile.java | 13 +-
.../org/apache/hadoop/ozone/scm/TestSCMCli.java | 47 +++--
.../ozone/scm/TestXceiverClientManager.java | 24 ++-
.../scm/container/TestContainerMapping.java | 28 ++-
.../ozone/scm/node/TestContainerPlacement.java | 17 +-
.../ozone/web/client/TestBucketsRatis.java | 3 +-
.../hadoop/ozone/web/client/TestKeysRatis.java | 2 +
.../ozone/web/client/TestVolumeRatis.java | 6 +-
62 files changed, 1369 insertions(+), 764 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index c563079..517e474 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -52,7 +52,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
private int infoSecurePort; // info server port
private int ipcPort; // IPC server port
private String xferAddr;
- private int containerPort; // container server port.
+ private int containerPort; // container Stand_alone Rpc port.
+ private int ratisPort; // Container Ratis RPC Port.
/**
* UUID identifying a given datanode. For upgraded Datanodes this is the
@@ -78,7 +79,7 @@ public class DatanodeID implements Comparable<DatanodeID> {
}
/**
- * Create a DatanodeID
+ * Create a DatanodeID.
* @param ipAddr IP
* @param hostName hostname
* @param datanodeUuid data node ID, UUID for new Datanodes, may be the
@@ -296,6 +297,22 @@ public class DatanodeID implements Comparable<DatanodeID> {
}
/**
+ * Gets the Ratis Port.
+ * @return retis port.
+ */
+ public int getRatisPort() {
+ return ratisPort;
+ }
+
+ /**
+ * Sets the Ratis Port.
+ * @param ratisPort - Ratis port.
+ */
+ public void setRatisPort(int ratisPort) {
+ this.ratisPort = ratisPort;
+ }
+
+ /**
* Returns a DataNode ID from the protocol buffers.
*
* @param datanodeIDProto - protoBuf Message
@@ -308,6 +325,7 @@ public class DatanodeID implements Comparable<DatanodeID> {
datanodeIDProto.getXferPort(), datanodeIDProto.getInfoPort(),
datanodeIDProto.getInfoSecurePort(), datanodeIDProto.getIpcPort());
id.setContainerPort(datanodeIDProto.getContainerPort());
+ id.setRatisPort(datanodeIDProto.getRatisPort());
return id;
}
@@ -326,6 +344,7 @@ public class DatanodeID implements Comparable<DatanodeID> {
.setInfoSecurePort(this.getInfoSecurePort())
.setIpcPort(this.getIpcPort())
.setContainerPort(this.getContainerPort())
+ .setRatisPort(this.getRatisPort())
.build();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 72b703a..a16c679 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -184,8 +184,6 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
-import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ChunkedArrayList;
@@ -2489,19 +2487,6 @@ public class PBHelperClient {
return result;
}
- public static ContainerRequestProto.ReplicationFactor
- convertReplicationFactor(ScmClient.ReplicationFactor replicationFactor) {
- switch (replicationFactor) {
- case ONE:
- return ContainerRequestProto.ReplicationFactor.ONE;
- case THREE:
- return ContainerRequestProto.ReplicationFactor.THREE;
- default:
- throw new IllegalArgumentException("Ozone only supports replicaiton" +
- " factor 1 or 3");
- }
- }
-
public static XAttr convertXAttr(XAttrProto a) {
XAttr.Builder builder = new XAttr.Builder();
builder.setNameSpace(convert(a.getNamespace()));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8a99359..64c7987 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -45,6 +45,22 @@ public final class OzoneConfigKeys {
public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
false;
+ /**
+ * Ratis Port where containers listen to.
+ */
+ public static final String DFS_CONTAINER_RATIS_IPC_PORT =
+ "dfs.container.ratis.ipc";
+ public static final int DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT = 50012;
+
+ /**
+ * When set to true, allocate a random free port for ozone container, so that
+ * a mini cluster is able to launch multiple containers on a node.
+ */
+ public static final String DFS_CONTAINER_RATIS_IPC_RANDOM_PORT =
+ "dfs.container.ratis.ipc.random.port";
+ public static final boolean DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT =
+ false;
+
public static final String OZONE_LOCALSTORAGE_ROOT =
"ozone.localstorage.root";
public static final String OZONE_LOCALSTORAGE_ROOT_DEFAULT = "/tmp/ozone";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 5e73045..68f1e09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -123,6 +123,8 @@ public final class OzoneConsts {
*/
public static final int MAX_LISTVOLUMES_SIZE = 1024;
+ public static final int INVALID_PORT = -1;
+
private OzoneConsts() {
// Never Constructed
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
index b0e4e4e..508c004 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
@@ -31,6 +31,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import static org.apache.hadoop.scm.ScmConfigKeys
@@ -164,4 +165,36 @@ public class XceiverClientManager implements Closeable {
clientCache.invalidateAll();
clientCache.cleanUp();
}
+
+ /**
+ * Tells us if Ratis is enabled for this cluster.
+ * @return True if Ratis is enabled.
+ */
+ public boolean isUseRatis() {
+ return useRatis;
+ }
+
+ /**
+ * Returns hard coded 3 as replication factor.
+ * @return 3
+ */
+ public OzoneProtos.ReplicationFactor getFactor() {
+ if(isUseRatis()) {
+ return OzoneProtos.ReplicationFactor.THREE;
+ }
+ return OzoneProtos.ReplicationFactor.ONE;
+ }
+
+ /**
+ * Returns the default replication type.
+ * @return Ratis or Standalone
+ */
+ public OzoneProtos.ReplicationType getType() {
+ // TODO : Fix me and make Ratis default before release.
+ if(isUseRatis()) {
+ return OzoneProtos.ReplicationType.RATIS;
+ }
+ return OzoneProtos.ReplicationType.STAND_ALONE;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
index 5ee70bc..a90cff4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
@@ -72,10 +72,7 @@ public class ContainerOperationClient implements ScmClient {
}
/**
- * Create a container with the given ID as its name.
- * @param containerId - String container ID
- * @return A Pipeline object to actually write/read from the container.
- * @throws IOException
+ * @inheritDoc
*/
@Override
public Pipeline createContainer(String containerId)
@@ -83,7 +80,10 @@ public class ContainerOperationClient implements ScmClient {
XceiverClientSpi client = null;
try {
Pipeline pipeline =
- storageContainerLocationClient.allocateContainer(containerId);
+ storageContainerLocationClient.allocateContainer(
+ xceiverClientManager.getType(),
+ xceiverClientManager.getFactor(), containerId);
+
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
ContainerProtocolCalls.createContainer(client, traceID);
@@ -101,21 +101,18 @@ public class ContainerOperationClient implements ScmClient {
}
/**
- * Creates a Container on SCM with specified replication factor.
- * @param containerId - String container ID
- * @param replicationFactor - replication factor
- * @return Pipeline
- * @throws IOException
+ * @inheritDoc
*/
@Override
- public Pipeline createContainer(String containerId,
- ScmClient.ReplicationFactor replicationFactor) throws IOException {
+ public Pipeline createContainer(OzoneProtos.ReplicationType type,
+ OzoneProtos.ReplicationFactor factor,
+ String containerId) throws IOException {
XceiverClientSpi client = null;
try {
// allocate container on SCM.
Pipeline pipeline =
- storageContainerLocationClient.allocateContainer(containerId,
- replicationFactor);
+ storageContainerLocationClient.allocateContainer(type, factor,
+ containerId);
// connect to pipeline leader and allocate container on leader datanode.
client = xceiverClientManager.acquireClient(pipeline);
String traceID = UUID.randomUUID().toString();
@@ -123,7 +120,7 @@ public class ContainerOperationClient implements ScmClient {
LOG.info("Created container " + containerId +
" leader:" + pipeline.getLeader() +
" machines:" + pipeline.getMachines() +
- " replication factor:" + replicationFactor.getValue());
+ " replication factor:" + factor);
return pipeline;
} finally {
if (client != null) {
@@ -150,6 +147,17 @@ public class ContainerOperationClient implements ScmClient {
}
/**
+ * Creates a specified replication pipeline.
+ */
+ @Override
+ public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
+ OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
+ throws IOException {
+ return storageContainerLocationClient.createReplicationPipeline(type,
+ factor, nodePool);
+ }
+
+ /**
* Delete the container, this will release any resource it uses.
* @param pipeline - Pipeline that represents the container.
* @param force - True to forcibly delete the container.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java
index c651a8b..2c2d244 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java
@@ -95,41 +95,16 @@ public interface ScmClient {
long getContainerSize(Pipeline pipeline) throws IOException;
/**
- * Replication factors supported by Ozone and SCM.
- */
- enum ReplicationFactor{
- ONE(1),
- THREE(3);
-
- private final int value;
- ReplicationFactor(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return value;
- }
-
- public static ReplicationFactor parseReplicationFactor(int i) {
- switch (i) {
- case 1: return ONE;
- case 3: return THREE;
- default:
- throw new IllegalArgumentException("Only replication factor 1 or 3" +
- " is supported by Ozone/SCM.");
- }
- }
- }
-
- /**
* Creates a Container on SCM and returns the pipeline.
- * @param containerId - String container ID
- * @param replicationFactor - replication factor (only 1/3 is supported)
+ * @param type - Replication Type.
+ * @param replicationFactor - Replication Factor
+ * @param containerId - Container ID
* @return Pipeline
- * @throws IOException
+ * @throws IOException - in case of error.
*/
- Pipeline createContainer(String containerId,
- ReplicationFactor replicationFactor) throws IOException;
+ Pipeline createContainer(OzoneProtos.ReplicationType type,
+ OzoneProtos.ReplicationFactor replicationFactor, String containerId)
+ throws IOException;
/**
* Returns a set of Nodes that meet a query criteria.
@@ -141,4 +116,15 @@ public interface ScmClient {
*/
OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses,
OzoneProtos.QueryScope queryScope, String poolName) throws IOException;
+
+ /**
+ * Creates a specified replication pipeline.
+ * @param type - Type
+ * @param factor - Replication factor
+ * @param nodePool - Set of machines.
+ * @throws IOException
+ */
+ Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
+ OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
+ throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
index 6bb5800..ea0893e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
@@ -1,19 +1,18 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
package org.apache.hadoop.scm.protocol;
@@ -22,7 +21,6 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
-import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@@ -31,26 +29,14 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
* that currently host a container.
*/
public interface StorageContainerLocationProtocol {
-
- /**
- * Asks SCM where a container should be allocated. SCM responds with the
- * set of datanodes that should be used creating this container.
- * @param containerName - Name of the container.
- * @return Pipeline.
- * @throws IOException
- */
- Pipeline allocateContainer(String containerName) throws IOException;
-
/**
* Asks SCM where a container should be allocated. SCM responds with the
* set of datanodes that should be used creating this container.
- * @param containerName - Name of the container.
- * @param replicationFactor - replication factor.
- * @return Pipeline.
- * @throws IOException
+ *
*/
- Pipeline allocateContainer(String containerName,
- ScmClient.ReplicationFactor replicationFactor) throws IOException;
+ Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
+ OzoneProtos.ReplicationFactor factor,
+ String containerName) throws IOException;
/**
* Ask SCM the location of the container. SCM responds with a group of
@@ -99,4 +85,15 @@ public interface StorageContainerLocationProtocol {
OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses,
OzoneProtos.QueryScope queryScope, String poolName) throws IOException;
+ /**
+ * Creates a replication pipeline of a specified type.
+ * @param type - replication type
+ * @param factor - factor 1 or 3
+ * @param nodePool - optional machine list to build a pipeline.
+ * @throws IOException
+ */
+ Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
+ OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
+ throws IOException;
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 7ec4a86..93cd0cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -21,12 +21,10 @@ import com.google.common.base.Strings;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
@@ -37,6 +35,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@@ -74,37 +74,27 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
/**
* Asks SCM where a container should be allocated. SCM responds with the set
- * of datanodes that should be used creating this container.
- *
- * @param containerName - Name of the container.
- * @return Pipeline.
- * @throws IOException
- */
- @Override
- public Pipeline allocateContainer(String containerName) throws IOException {
- return allocateContainer(containerName, ScmClient.ReplicationFactor.ONE);
- }
-
- /**
- * Asks SCM where a container should be allocated. SCM responds with the set
* of datanodes that should be used creating this container. Ozone/SCM only
* supports replication factor of either 1 or 3.
- *
- * @param containerName - Name of the container.
- * @param replicationFactor - replication factor.
- * @return Pipeline.
+ * @param type - Replication Type
+ * @param factor - Replication Count
+ * @param containerName - Name
+ * @return
* @throws IOException
*/
@Override
- public Pipeline allocateContainer(String containerName,
- ScmClient.ReplicationFactor replicationFactor) throws IOException {
+ public Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+ OzoneProtos.ReplicationFactor factor, String
+ containerName) throws IOException {
Preconditions.checkNotNull(containerName, "Container Name cannot be Null");
Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" +
" be empty");
ContainerRequestProto request = ContainerRequestProto.newBuilder()
- .setContainerName(containerName).setReplicationFactor(PBHelperClient
- .convertReplicationFactor(replicationFactor)).build();
+ .setContainerName(containerName)
+ .setReplicationFactor(factor)
+ .setReplicationType(type)
+ .build();
final ContainerResponseProto response;
try {
@@ -217,6 +207,42 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
}
+ /**
+ * Creates a replication pipeline of a specified type.
+ *
+ * @param replicationType - replication type
+ * @param factor - factor 1 or 3
+ * @param nodePool - optional machine list to build a pipeline.
+ * @throws IOException
+ */
+ @Override
+ public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType
+ replicationType, OzoneProtos.ReplicationFactor factor, OzoneProtos
+ .NodePool nodePool) throws IOException {
+ PipelineRequestProto request = PipelineRequestProto.newBuilder()
+ .setNodePool(nodePool)
+ .setReplicationFactor(factor)
+ .setReplicationType(replicationType)
+ .build();
+ try {
+ PipelineResponseProto response =
+ rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request);
+ if (response.getErrorCode() ==
+ PipelineResponseProto.Error.success) {
+ Preconditions.checkState(response.hasPipeline(), "With success, " +
+ "must come a pipeline");
+ return Pipeline.getFromProtoBuf(response.getPipeline());
+ } else {
+ String errorMessage = String.format("create replication pipeline " +
+ "failed. code : %s Message: %s", response.getErrorCode(),
+ response.hasErrorMessage() ? response.getErrorMessage() : "");
+ throw new IOException(errorMessage);
+ }
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
index bedd9a8..a2acdff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
@@ -41,7 +41,7 @@ public interface RatisHelper {
Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
static String toRaftPeerIdString(DatanodeID id) {
- return id.getIpAddr() + ":" + id.getContainerPort();
+ return id.getIpAddr() + ":" + id.getRatisPort();
}
static RaftPeerId toRaftPeerId(DatanodeID id) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
index ede6ea9..50926c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
@@ -35,6 +35,7 @@ message Pipeline {
required string leaderID = 1;
repeated DatanodeIDProto members = 2;
required string containerName = 3;
+ optional LifeCycleStates state = 4 [default = OPERATIONAL];
}
message KeyValue {
@@ -71,3 +72,22 @@ message NodePool {
repeated Node nodes = 1;
}
+
+enum ReplicationType {
+ RATIS = 1;
+ STAND_ALONE = 2;
+ CHAINED = 3;
+}
+
+
+enum ReplicationFactor {
+ ONE = 1;
+ THREE = 3;
+}
+
+enum LifeCycleStates {
+ CLIENT_CREATE = 1; // A request to client to create this object
+ OPERATIONAL = 2; // Mostly an update to SCM via HB or client call.
+ TIMED_OUT = 3; // creation has timed out from SCM's View.
+ DELETED = 4; // object is deleted.
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
index 6c16347..30c7166 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
@@ -37,11 +37,9 @@ import "Ozone.proto";
message ContainerRequestProto {
required string containerName = 1;
// Ozone only support replciation of either 1 or 3.
- enum ReplicationFactor {
- ONE = 1;
- THREE = 3;
- }
- required ReplicationFactor replicationFactor = 2;
+ required hadoop.hdfs.ozone.ReplicationFactor replicationFactor = 2;
+ required hadoop.hdfs.ozone.ReplicationType replicationType = 3;
+
}
/**
@@ -111,6 +109,28 @@ message NodeQueryResponseProto {
required hadoop.hdfs.ozone.NodePool datanodes = 1;
}
+/**
+ Request to create a replication pipeline.
+ */
+message PipelineRequestProto {
+ required hadoop.hdfs.ozone.ReplicationType replicationType = 1;
+ required hadoop.hdfs.ozone.ReplicationFactor replicationFactor = 2;
+
+ // if datanodes are specified then pipelines are created using those
+ // datanodes.
+ optional hadoop.hdfs.ozone.NodePool nodePool = 3;
+ optional string pipelineID = 4;
+}
+
+message PipelineResponseProto {
+ enum Error {
+ success = 1;
+ errorPipelineAlreadyExists = 2;
+ }
+ required Error errorCode = 1;
+ optional hadoop.hdfs.ozone.Pipeline pipeline = 2;
+ optional string errorMessage = 3;
+}
/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
@@ -139,4 +159,21 @@ service StorageContainerLocationProtocolService {
* Returns a set of Nodes that meet a criteria.
*/
rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto);
+
+ /*
+ * Apis that Manage Pipelines.
+ *
+ * Pipelines are abstractions offered by SCM and Datanode that allows users
+ * to create a replication pipeline.
+ *
+ * These following APIs allow command line programs like SCM CLI to list
+ * and manage pipelines.
+ */
+
+ /**
+ * Creates a replication pipeline.
+ */
+ rpc allocatePipeline(PipelineRequestProto)
+ returns (PipelineResponseProto);
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 06d6802..497d734 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -58,7 +58,8 @@ message DatanodeIDProto {
required uint32 infoPort = 5; // datanode http port
required uint32 ipcPort = 6; // ipc server port
optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
- optional uint32 containerPort = 8 [default = 0]; // Ozone container protocol
+ optional uint32 containerPort = 8 [default = 0]; // Ozone stand_alone protocol
+ optional uint32 ratisPort = 9 [default = 0]; //Ozone ratis port
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
index 368c250..1f22aa8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.cblock.meta.VolumeInfo;
import org.apache.hadoop.cblock.proto.MountVolumeResponse;
import org.apache.hadoop.cblock.util.KeyUtil;
import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger;
@@ -179,9 +180,10 @@ public class StorageManager {
long allocatedSize = 0;
ArrayList<String> containerIds = new ArrayList<>();
while (allocatedSize < volumeSize) {
- Pipeline pipeline = storageClient.createContainer(
- KeyUtil.getContainerName(userName, volumeName, containerIdx),
- ScmClient.ReplicationFactor.ONE);
+ Pipeline pipeline = storageClient.createContainer(OzoneProtos
+ .ReplicationType.STAND_ALONE,
+ OzoneProtos.ReplicationFactor.ONE,
+ KeyUtil.getContainerName(userName, volumeName, containerIdx));
ContainerDescriptor container =
new ContainerDescriptor(pipeline.getContainerName());
container.setPipeline(pipeline);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 5e0a656..66992af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
@@ -72,7 +73,7 @@ public class DatanodeStateMachine implements Closeable {
context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
heartbeatFrequency = TimeUnit.SECONDS.toMillis(
OzoneClientUtils.getScmHeartbeatInterval(conf));
- container = new OzoneContainer(conf);
+ container = new OzoneContainer(datanodeID, new OzoneConfiguration(conf));
this.datanodeID = datanodeID;
nextHB = new AtomicLong(Time.monotonicNow());
@@ -87,11 +88,6 @@ public class DatanodeStateMachine implements Closeable {
.build();
}
- public DatanodeStateMachine(Configuration conf)
- throws IOException {
- this(null, conf);
- }
-
public void setDatanodeID(DatanodeID datanodeID) {
this.datanodeID = datanodeID;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index eb2ca3a..994b245 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ReportState.states
.noContainerReports;
@@ -90,7 +91,16 @@ public class StateContext {
*/
public int getContainerPort() {
return parent == null ?
- -1 : parent.getContainer().getContainerServerPort();
+ INVALID_PORT : parent.getContainer().getContainerServerPort();
+ }
+
+ /**
+ * Gets the Ratis Port.
+ * @return int , return -1 if not valid.
+ */
+ public int getRatisPort() {
+ return parent == null ?
+ INVALID_PORT : parent.getContainer().getRatisContainerServerPort();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
index d35f64d..0552a2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
@@ -113,9 +113,11 @@ public class InitDatanodeState implements DatanodeState,
}
File idPath = new File(dataNodeIDPath);
int containerPort = this.context.getContainerPort();
+ int ratisPort = this.context.getRatisPort();
DatanodeID datanodeID = this.context.getParent().getDatanodeID();
if (datanodeID != null) {
datanodeID.setContainerPort(containerPort);
+ datanodeID.setRatisPort(ratisPort);
ContainerUtils.writeDatanodeIDTo(datanodeID, idPath);
LOG.info("Datanode ID is persisted to {}", dataNodeIDPath);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
index 3a6e672..7271cb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
@@ -29,6 +29,7 @@ import io.netty.handler.logging.LoggingHandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,6 +82,16 @@ public final class XceiverServer implements XceiverServerSpi {
return this.port;
}
+ /**
+ * Returns the Replication type supported by this end-point.
+ *
+ * @return enum -- {Stand_Alone, Ratis, Chained}
+ */
+ @Override
+ public OzoneProtos.ReplicationType getServerType() {
+ return OzoneProtos.ReplicationType.STAND_ALONE;
+ }
+
@Override
public void start() throws IOException {
bossGroup = new NioEventLoopGroup();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index e5b0497..b61d7fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.ozone.container.common.transport.server;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+
import java.io.IOException;
/** A server endpoint that acts as the communication layer for Ozone
@@ -31,4 +33,11 @@ public interface XceiverServerSpi {
/** Get server IPC port. */
int getIPCPort();
+
+ /**
+ * Returns the Replication type supported by this end-point.
+ * @return enum -- {Stand_Alone, Ratis, Chained}
+ */
+ OzoneProtos.ReplicationType getServerType();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 69f3801..ba613c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -18,10 +18,15 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server
+ .XceiverServerSpi;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
@@ -34,7 +39,9 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
+import java.net.ServerSocket;
import java.util.Collections;
import java.util.Objects;
@@ -44,6 +51,22 @@ import java.util.Objects;
*/
public final class XceiverServerRatis implements XceiverServerSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
+ private final int port;
+ private final RaftServer server;
+
+ private XceiverServerRatis(
+ String id, int port, String storageDir,
+ ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
+ Objects.requireNonNull(id, "id == null");
+ this.port = port;
+
+ this.server = RaftServer.newBuilder()
+ .setServerId(RaftPeerId.valueOf(id))
+ .setPeers(Collections.emptyList())
+ .setProperties(newRaftProperties(rpcType, port, storageDir))
+ .setStateMachine(new ContainerStateMachine(dispatcher))
+ .build();
+ }
static RaftProperties newRaftProperties(
RpcType rpc, int port, String storageDir) {
@@ -52,44 +75,60 @@ public final class XceiverServerRatis implements XceiverServerSpi {
RaftConfigKeys.Rpc.setType(properties, rpc);
if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, port);
- } else if (rpc == SupportedRpcType.NETTY) {
- NettyConfigKeys.Server.setPort(properties, port);
+ } else {
+ if (rpc == SupportedRpcType.NETTY) {
+ NettyConfigKeys.Server.setPort(properties, port);
+ }
}
return properties;
}
- public static XceiverServerRatis newXceiverServerRatis(
+ public static XceiverServerRatis newXceiverServerRatis(String datanodeID,
Configuration ozoneConf, ContainerDispatcher dispatcher)
throws IOException {
- final String id = ozoneConf.get(
- OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID);
- final int port = ozoneConf.getInt(
- OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
- OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
- final String storageDir = ozoneConf.get(
+ final String ratisDir = File.separator + "ratis";
+ int localPort = ozoneConf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
+ String storageDir = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
+
+ if (Strings.isNullOrEmpty(storageDir)) {
+ storageDir = ozoneConf.get(OzoneConfigKeys
+ .OZONE_CONTAINER_METADATA_DIRS);
+ Preconditions.checkNotNull(storageDir, "ozone.container.metadata.dirs " +
+ "cannot be null, Please check your configs.");
+ storageDir = storageDir.concat(ratisDir);
+ LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " +
+ "storage under {}. It is a good idea to map this to an SSD disk.",
+ storageDir);
+ }
final String rpcType = ozoneConf.get(
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
- return new XceiverServerRatis(id, port, storageDir, dispatcher, rpc);
- }
- private final int port;
- private final RaftServer server;
-
- private XceiverServerRatis(
- String id, int port, String storageDir,
- ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
- Objects.requireNonNull(id, "id == null");
- this.port = port;
-
- this.server = RaftServer.newBuilder()
- .setServerId(RaftPeerId.valueOf(id))
- .setPeers(Collections.emptyList())
- .setProperties(newRaftProperties(rpcType, port, storageDir))
- .setStateMachine(new ContainerStateMachine(dispatcher))
- .build();
+ // Get an available port on current node and
+ // use that as the container port
+ if (ozoneConf.getBoolean(OzoneConfigKeys
+ .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ socket.setReuseAddress(true);
+ localPort = socket.getLocalPort();
+ LOG.info("Found a free port for the server : {}", localPort);
+ // If we have random local ports configured this means that it
+ // probably running under MiniOzoneCluster. Ratis locks the storage
+ // directories, so we need to pass different local directory for each
+ // local instance. So we map ratis directories under datanode ID.
+ storageDir = storageDir.concat(File.separator + datanodeID);
+ } catch (IOException e) {
+ LOG.error("Unable find a random free port for the server, "
+ + "fallback to use default port {}", localPort, e);
+ }
+ }
+ return new XceiverServerRatis(datanodeID, localPort, storageDir,
+ dispatcher, rpc);
}
@Override
@@ -112,4 +151,14 @@ public final class XceiverServerRatis implements XceiverServerSpi {
public int getIPCPort() {
return port;
}
+
+ /**
+ * Returns the Replication type supported by this end-point.
+ *
+ * @return enum -- {Stand_Alone, Ratis, Chained}
+ */
+ @Override
+ public OzoneProtos.ReplicationType getServerType() {
+ return OzoneProtos.ReplicationType.RATIS;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 792c132..307f59f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@@ -33,6 +34,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -50,6 +53,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
/**
* Ozone main class sets up the network server and initializes the container
@@ -62,7 +66,7 @@ public class OzoneContainer {
private final Configuration ozoneConfig;
private final ContainerDispatcher dispatcher;
private final ContainerManager manager;
- private final XceiverServerSpi server;
+ private final XceiverServerSpi[] server;
private final ChunkManager chunkManager;
private final KeyManager keyManager;
private final BlockDeletingService blockDeletingService;
@@ -73,8 +77,8 @@ public class OzoneContainer {
* @param ozoneConfig - Config
* @throws IOException
*/
- public OzoneContainer(
- Configuration ozoneConfig) throws IOException {
+ public OzoneContainer(DatanodeID datanodeID, Configuration ozoneConfig) throws
+ IOException {
this.ozoneConfig = ozoneConfig;
List<StorageLocation> locations = new LinkedList<>();
String[] paths = ozoneConfig.getStrings(
@@ -104,12 +108,11 @@ public class OzoneContainer {
this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
- final boolean useRatis = ozoneConfig.getBoolean(
- OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
- OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
- server = useRatis?
- XceiverServerRatis.newXceiverServerRatis(ozoneConfig, dispatcher)
- : new XceiverServer(this.ozoneConfig, this.dispatcher);
+ server = new XceiverServerSpi[]{
+ new XceiverServer(this.ozoneConfig, this.dispatcher),
+ XceiverServerRatis.newXceiverServerRatis(datanodeID
+ .getDatanodeUuid().toString(), ozoneConfig, dispatcher)
+ };
}
/**
@@ -118,7 +121,9 @@ public class OzoneContainer {
* @throws IOException
*/
public void start() throws IOException {
- server.start();
+ for (XceiverServerSpi serverinstance : server) {
+ serverinstance.start();
+ }
blockDeletingService.start();
dispatcher.init();
}
@@ -157,7 +162,9 @@ public class OzoneContainer {
*/
public void stop() {
LOG.info("Attempting to stop container services.");
- server.stop();
+ for(XceiverServerSpi serverinstance: server) {
+ serverinstance.stop();
+ }
dispatcher.shutdown();
try {
@@ -194,13 +201,31 @@ public class OzoneContainer {
return this.manager.getNodeReport();
}
+ private int getPortbyType(OzoneProtos.ReplicationType replicationType) {
+ for (XceiverServerSpi serverinstance : server) {
+ if (serverinstance.getServerType() == replicationType) {
+ return serverinstance.getIPCPort();
+ }
+ }
+ return INVALID_PORT;
+ }
+
/**
* Returns the container server IPC port.
*
* @return Container server IPC port.
*/
public int getContainerServerPort() {
- return server.getIPCPort();
+ return getPortbyType(OzoneProtos.ReplicationType.STAND_ALONE);
+ }
+
+ /**
+ * Returns the Ratis container Server IPC port.
+ *
+ * @return Ratis port.
+ */
+ public int getRatisContainerServerPort() {
+ return getPortbyType(OzoneProtos.ReplicationType.RATIS);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index e45afb9..628de42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -30,22 +31,17 @@ import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
-import static org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.ContainerRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.ContainerResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.GetContainerRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.GetContainerResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.DeleteContainerRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.DeleteContainerResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.ListContainerResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos.ListContainerRequestProto;
+import static org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
+
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
@@ -74,7 +70,8 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
public ContainerResponseProto allocateContainer(RpcController unused,
ContainerRequestProto request) throws ServiceException {
try {
- Pipeline pipeline = impl.allocateContainer(request.getContainerName());
+ Pipeline pipeline = impl.allocateContainer(request.getReplicationType(),
+ request.getReplicationFactor(), request.getContainerName());
return ContainerResponseProto.newBuilder()
.setPipeline(pipeline.getProtobufMessage())
.setErrorCode(ContainerResponseProto.Error.success)
@@ -161,4 +158,12 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
throw new ServiceException(e);
}
}
+
+ @Override
+ public PipelineResponseProto allocatePipeline(
+ RpcController controller, PipelineRequestProto request)
+ throws ServiceException {
+ // TODO : Wiring this up requires one more patch.
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index e8cc8f0..e320983 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -39,41 +39,24 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerDatanodeProtocolProtos.Type;
-import org.apache.hadoop.ozone.protocol.proto
- .StorageContainerLocationProtocolProtos;
-import org.apache.hadoop.ozone.protocolPB
- .ScmBlockLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
-import org.apache.hadoop.ozone.protocolPB
- .StorageContainerDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ozone.protocolPB
- .StorageContainerLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.scm.block.BlockManager;
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
import org.apache.hadoop.ozone.scm.container.ContainerMapping;
@@ -81,7 +64,6 @@ import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
-import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@@ -386,21 +368,6 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
}
/**
- * Asks SCM where a container should be allocated. SCM responds with the set
- * of datanodes that should be used creating this container.
- *
- * @param containerName - Name of the container.
- * @return Pipeline.
- * @throws IOException
- */
- @Override
- public Pipeline allocateContainer(String containerName) throws IOException {
- checkAdminAccess();
- return scmContainerManager.allocateContainer(containerName,
- ScmClient.ReplicationFactor.ONE);
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -458,6 +425,19 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
}
/**
+ * Creates a replication pipeline of a specified type.
+ */
+ @Override
+ public Pipeline createReplicationPipeline(
+ OzoneProtos.ReplicationType replicationType,
+ OzoneProtos.ReplicationFactor factor,
+ OzoneProtos.NodePool nodePool)
+ throws IOException {
+ // TODO: will be addressed in future patch.
+ return null;
+ }
+
+ /**
* Queries a list of Node that match a set of statuses.
* <p>
* For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER,
@@ -527,11 +507,12 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
* @throws IOException
*/
@Override
- public Pipeline allocateContainer(String containerName,
- ScmClient.ReplicationFactor replicationFactor) throws IOException {
+ public Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
+ OzoneProtos.ReplicationFactor replicationFactor, String containerName)
+ throws IOException {
checkAdminAccess();
- return scmContainerManager.allocateContainer(containerName,
- replicationFactor);
+ return scmContainerManager.allocateContainer(replicationType,
+ replicationFactor, containerName);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 0eb60e4..e000ccc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
@@ -177,7 +178,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
for (int i = 0; i < count; i++) {
String containerName = UUID.randomUUID().toString();
try {
- Pipeline pipeline = containerManager.allocateContainer(containerName);
+ // TODO: Fix this later when Ratis is made the Default.
+ Pipeline pipeline = containerManager.allocateContainer(
+ OzoneProtos.ReplicationType.STAND_ALONE,
+ OzoneProtos.ReplicationFactor.ONE,
+ containerName);
+
if (pipeline == null) {
LOG.warn("Unable to allocate container.");
continue;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index 643779d..8daa5d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -1,4 +1,3 @@
-
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
@@ -22,15 +21,11 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
-import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
@@ -41,8 +36,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
@@ -65,8 +58,7 @@ public class ContainerMapping implements Mapping {
private final Lock lock;
private final Charset encoding = Charset.forName("UTF-8");
private final MetadataStore containerStore;
- private final ContainerPlacementPolicy placementPolicy;
- private final long containerSize;
+ private final PipelineSelector pipelineSelector;
/**
* Constructs a mapping class that creates mapping between container names and
@@ -96,66 +88,10 @@ public class ContainerMapping implements Mapping {
.build();
this.lock = new ReentrantLock();
-
- this.containerSize = OzoneConsts.GB * conf.getInt(
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
- ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
- this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
+ this.pipelineSelector = new PipelineSelector(nodeManager, conf);
}
- /**
- * Create pluggable container placement policy implementation instance.
- *
- * @param nodeManager - SCM node manager.
- * @param conf - configuration.
- * @return SCM container placement policy implementation instance.
- */
- @SuppressWarnings("unchecked")
- private static ContainerPlacementPolicy createContainerPlacementPolicy(
- final NodeManager nodeManager, final Configuration conf) {
- Class<? extends ContainerPlacementPolicy> implClass =
- (Class<? extends ContainerPlacementPolicy>) conf.getClass(
- ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementRandom.class);
- try {
- Constructor<? extends ContainerPlacementPolicy> ctor =
- implClass.getDeclaredConstructor(NodeManager.class,
- Configuration.class);
- return ctor.newInstance(nodeManager, conf);
- } catch (RuntimeException e) {
- throw e;
- } catch (InvocationTargetException e) {
- throw new RuntimeException(implClass.getName()
- + " could not be constructed.", e.getCause());
- } catch (Exception e) {
- LOG.error("Unhandled exception occured, Placement policy will not be " +
- "functional.");
- throw new IllegalArgumentException("Unable to load " +
- "ContainerPlacementPolicy", e);
- }
- }
-
- /**
- * Translates a list of nodes, ordered such that the first is the leader, into
- * a corresponding {@link Pipeline} object.
- * @param nodes - list of datanodes on which we will allocate the container.
- * The first of the list will be the leader node.
- * @param containerName container name
- * @return pipeline corresponding to nodes
- */
- private static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes,
- final String containerName) {
- Preconditions.checkNotNull(nodes);
- Preconditions.checkArgument(nodes.size() > 0);
- String leaderId = nodes.get(0).getDatanodeUuid();
- Pipeline pipeline = new Pipeline(leaderId);
- for (DatanodeID node : nodes) {
- pipeline.addMember(node);
- }
- pipeline.setContainerName(containerName);
- return pipeline;
- }
/**
* Returns the Pipeline from the container name.
@@ -192,7 +128,7 @@ public class ContainerMapping implements Mapping {
List<Pipeline> pipelineList = new ArrayList<>();
lock.lock();
try {
- if(containerStore.isEmpty()) {
+ if (containerStore.isEmpty()) {
throw new IOException("No container exists in current db");
}
MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName);
@@ -217,26 +153,14 @@ public class ContainerMapping implements Mapping {
* Allocates a new container.
*
* @param containerName - Name of the container.
- * @return - Pipeline that makes up this container.
- * @throws IOException
- */
- @Override
- public Pipeline allocateContainer(final String containerName)
- throws IOException {
- return allocateContainer(containerName, ScmClient.ReplicationFactor.ONE);
- }
-
- /**
- * Allocates a new container.
- *
- * @param containerName - Name of the container.
* @param replicationFactor - replication factor of the container.
* @return - Pipeline that makes up this container.
- * @throws IOException
+ * @throws IOException - Exception
*/
@Override
- public Pipeline allocateContainer(final String containerName,
- final ScmClient.ReplicationFactor replicationFactor) throws IOException {
+ public Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+ OzoneProtos.ReplicationFactor replicationFactor,
+ final String containerName) throws IOException {
Preconditions.checkNotNull(containerName);
Preconditions.checkState(!containerName.isEmpty());
Pipeline pipeline = null;
@@ -253,18 +177,10 @@ public class ContainerMapping implements Mapping {
throw new SCMException("Specified container already exists. key : " +
containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
}
- List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
- replicationFactor.getValue(), containerSize);
- // TODO: handle under replicated container
- if (datanodes != null && datanodes.size() > 0) {
- pipeline = newPipelineFromNodes(datanodes, containerName);
- containerStore.put(containerName.getBytes(encoding),
- pipeline.getProtobufMessage().toByteArray());
- } else {
- LOG.debug("Unable to find enough datanodes for new container. " +
- "Required {} found {}", replicationFactor,
- datanodes != null ? datanodes.size(): 0);
- }
+ pipeline = pipelineSelector.getReplicationPipeline(type,
+ replicationFactor, containerName);
+ containerStore.put(containerName.getBytes(encoding),
+ pipeline.getProtobufMessage().toByteArray());
} finally {
lock.unlock();
}
@@ -275,9 +191,8 @@ public class ContainerMapping implements Mapping {
* Deletes a container from SCM.
*
* @param containerName - Container name
- * @throws IOException
- * if container doesn't exist
- * or container store failed to delete the specified key.
+ * @throws IOException if container doesn't exist or container store failed to
+ * delete the specified key.
*/
@Override
public void deleteContainer(String containerName) throws IOException {
@@ -286,7 +201,7 @@ public class ContainerMapping implements Mapping {
byte[] dbKey = containerName.getBytes(encoding);
byte[] pipelineBytes =
containerStore.get(dbKey);
- if(pipelineBytes == null) {
+ if (pipelineBytes == null) {
throw new SCMException("Failed to delete container "
+ containerName + ", reason : container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
index 194158b..1ef3572 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
@@ -17,7 +17,7 @@
package org.apache.hadoop.ozone.scm.container;
-import org.apache.hadoop.scm.client.ScmClient;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.io.Closeable;
@@ -57,14 +57,6 @@ public interface Mapping extends Closeable {
List<Pipeline> listContainer(String startName, String prefixName, int count)
throws IOException;
- /**
- * Allocates a new container for a given keyName.
- *
- * @param containerName - Name
- * @return - Pipeline that makes up this container.
- * @throws IOException
- */
- Pipeline allocateContainer(String containerName) throws IOException;
/**
* Allocates a new container for a given keyName and replication factor.
@@ -74,8 +66,9 @@ public interface Mapping extends Closeable {
* @return - Pipeline that makes up this container.
* @throws IOException
*/
- Pipeline allocateContainer(String containerName,
- ScmClient.ReplicationFactor replicationFactor) throws IOException;
+ Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+ OzoneProtos.ReplicationFactor replicationFactor,
+ String containerName) throws IOException;
/**
* Deletes a container from SCM.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
new file mode 100644
index 0000000..6293d84
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.pipelines;
+
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Manage Ozone pipelines.
+ */
+public interface PipelineManager {
+
+ /**
+ * This function is called by the Container Manager while allocating a new
+ * container. The client specifies what kind of replication pipeline is
+ * needed and based on the replication type in the request appropriate
+ * Interface is invoked.
+ *
+ * @param containerName Name of the container
+ * @param replicationFactor - Replication Factor
+ * @return a Pipeline.
+ */
+ Pipeline getPipeline(String containerName,
+ OzoneProtos.ReplicationFactor replicationFactor) throws IOException;
+
+ /**
+ * Creates a pipeline from a specified set of Nodes.
+ * @param pipelineID - Name of the pipeline
+ * @param datanodes - The list of datanodes that make this pipeline.
+ */
+ void createPipeline(String pipelineID, List<DatanodeID> datanodes)
+ throws IOException;;
+
+ /**
+ * Close the pipeline with the given clusterId.
+ */
+ void closePipeline(String pipelineID) throws IOException;
+
+ /**
+ * list members in the pipeline .
+ * @return the datanode
+ */
+ List<DatanodeID> getMembers(String pipelineID) throws IOException;
+
+ /**
+ * Update the datanode list of the pipeline.
+ */
+ void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes)
+ throws IOException;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org