You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/08/18 02:47:48 UTC

[3/3] hadoop git commit: HDFS-12159. Ozone: SCM: Add create replication pipeline RPC. Contributed by Anu Engineer.

HDFS-12159. Ozone: SCM: Add create replication pipeline RPC. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a245c60b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a245c60b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a245c60b

Branch: refs/heads/HDFS-7240
Commit: a245c60bb0eb7e5247d90b67c8947ab552af1e9f
Parents: 6c1e9ab
Author: Anu Engineer <ae...@apache.org>
Authored: Thu Aug 17 19:38:26 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Thu Aug 17 19:38:26 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/protocol/DatanodeID.java |  23 ++-
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |  15 --
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  16 ++
 .../org/apache/hadoop/ozone/OzoneConsts.java    |   2 +
 .../apache/hadoop/scm/XceiverClientManager.java |  33 ++++
 .../scm/client/ContainerOperationClient.java    |  38 ++--
 .../org/apache/hadoop/scm/client/ScmClient.java |  50 ++---
 .../StorageContainerLocationProtocol.java       |  59 +++---
 ...rLocationProtocolClientSideTranslatorPB.java |  72 ++++---
 .../main/java/org/apache/ratis/RatisHelper.java |   2 +-
 .../src/main/proto/Ozone.proto                  |  20 ++
 .../StorageContainerLocationProtocol.proto      |  47 ++++-
 .../src/main/proto/hdfs.proto                   |   3 +-
 .../hadoop/cblock/storage/StorageManager.java   |   8 +-
 .../statemachine/DatanodeStateMachine.java      |   8 +-
 .../common/statemachine/StateContext.java       |  12 +-
 .../states/datanode/InitDatanodeState.java      |   2 +
 .../common/transport/server/XceiverServer.java  |  11 ++
 .../transport/server/XceiverServerSpi.java      |   9 +
 .../server/ratis/XceiverServerRatis.java        | 103 +++++++---
 .../container/ozoneimpl/OzoneContainer.java     |  49 +++--
 ...rLocationProtocolServerSideTranslatorPB.java |  39 ++--
 .../ozone/scm/StorageContainerManager.java      |  89 ++++-----
 .../ozone/scm/block/BlockManagerImpl.java       |   8 +-
 .../ozone/scm/container/ContainerMapping.java   | 115 ++---------
 .../hadoop/ozone/scm/container/Mapping.java     |  15 +-
 .../ozone/scm/pipelines/PipelineManager.java    |  69 +++++++
 .../ozone/scm/pipelines/PipelineSelector.java   | 198 +++++++++++++++++++
 .../ozone/scm/pipelines/package-info.java       |  38 ++++
 .../scm/pipelines/ratis/RatisManagerImpl.java   | 113 +++++++++++
 .../ozone/scm/pipelines/ratis/package-info.java |  18 ++
 .../standalone/StandaloneManagerImpl.java       | 139 +++++++++++++
 .../scm/pipelines/standalone/package-info.java  |  18 ++
 .../hadoop/ozone/scm/ratis/RatisManager.java    |  59 ------
 .../ozone/scm/ratis/RatisManagerImpl.java       | 194 ------------------
 .../src/main/resources/ozone-default.xml        |  17 ++
 .../apache/hadoop/cblock/TestBufferManager.java |   4 +-
 .../hadoop/cblock/TestCBlockReadWrite.java      |   4 +-
 .../hadoop/cblock/TestLocalBlockCache.java      |   4 +-
 .../hadoop/cblock/util/MockStorageClient.java   |  20 +-
 .../namenode/TestFavoredNodesEndToEnd.java      |   6 +-
 .../apache/hadoop/ozone/MiniOzoneCluster.java   |  30 +--
 .../apache/hadoop/ozone/RatisTestHelper.java    |  10 +-
 .../hadoop/ozone/TestContainerOperations.java   |   5 +-
 .../hadoop/ozone/TestMiniOzoneCluster.java      |  34 +++-
 .../ozone/TestStorageContainerManager.java      |  16 +-
 .../common/TestDatanodeStateMachine.java        |  11 +-
 .../ozone/container/common/TestEndPoint.java    |  15 +-
 .../container/ozoneimpl/TestOzoneContainer.java |   4 +-
 .../ozoneimpl/TestOzoneContainerRatis.java      |  67 ++++---
 .../container/ozoneimpl/TestRatisManager.java   |  20 +-
 .../placement/TestContainerPlacement.java       |   3 +-
 .../transport/server/TestContainerServer.java   |  10 +-
 .../hadoop/ozone/scm/TestAllocateContainer.java |  19 +-
 .../ozone/scm/TestContainerSmallFile.java       |  13 +-
 .../org/apache/hadoop/ozone/scm/TestSCMCli.java |  47 +++--
 .../ozone/scm/TestXceiverClientManager.java     |  24 ++-
 .../scm/container/TestContainerMapping.java     |  28 ++-
 .../ozone/scm/node/TestContainerPlacement.java  |  17 +-
 .../ozone/web/client/TestBucketsRatis.java      |   3 +-
 .../hadoop/ozone/web/client/TestKeysRatis.java  |   2 +
 .../ozone/web/client/TestVolumeRatis.java       |   6 +-
 62 files changed, 1369 insertions(+), 764 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
index c563079..517e474 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
@@ -52,7 +52,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
   private int infoSecurePort; // info server port
   private int ipcPort;       // IPC server port
   private String xferAddr;
-  private int containerPort; // container server port.
+  private int containerPort; // container Stand_alone Rpc port.
+  private int ratisPort; // Container Ratis RPC Port.
 
   /**
    * UUID identifying a given datanode. For upgraded Datanodes this is the
@@ -78,7 +79,7 @@ public class DatanodeID implements Comparable<DatanodeID> {
   }
 
   /**
-   * Create a DatanodeID
+   * Create a DatanodeID.
    * @param ipAddr IP
    * @param hostName hostname
    * @param datanodeUuid data node ID, UUID for new Datanodes, may be the
@@ -296,6 +297,22 @@ public class DatanodeID implements Comparable<DatanodeID> {
   }
 
   /**
+   * Gets the Ratis Port.
+   * @return retis port.
+   */
+  public int getRatisPort() {
+    return ratisPort;
+  }
+
+  /**
+   * Sets the Ratis Port.
+   * @param ratisPort - Ratis port.
+   */
+  public void setRatisPort(int ratisPort) {
+    this.ratisPort = ratisPort;
+  }
+
+  /**
    * Returns a DataNode ID from the protocol buffers.
    *
    * @param datanodeIDProto - protoBuf Message
@@ -308,6 +325,7 @@ public class DatanodeID implements Comparable<DatanodeID> {
         datanodeIDProto.getXferPort(), datanodeIDProto.getInfoPort(),
         datanodeIDProto.getInfoSecurePort(), datanodeIDProto.getIpcPort());
     id.setContainerPort(datanodeIDProto.getContainerPort());
+    id.setRatisPort(datanodeIDProto.getRatisPort());
     return id;
   }
 
@@ -326,6 +344,7 @@ public class DatanodeID implements Comparable<DatanodeID> {
         .setInfoSecurePort(this.getInfoSecurePort())
         .setIpcPort(this.getIpcPort())
         .setContainerPort(this.getContainerPort())
+        .setRatisPort(this.getRatisPort())
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 72b703a..a16c679 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -184,8 +184,6 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
-import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ChunkedArrayList;
@@ -2489,19 +2487,6 @@ public class PBHelperClient {
     return result;
   }
 
-  public static ContainerRequestProto.ReplicationFactor
-      convertReplicationFactor(ScmClient.ReplicationFactor replicationFactor) {
-    switch (replicationFactor) {
-    case ONE:
-      return ContainerRequestProto.ReplicationFactor.ONE;
-    case THREE:
-      return ContainerRequestProto.ReplicationFactor.THREE;
-    default:
-      throw new IllegalArgumentException("Ozone only supports replicaiton" +
-          " factor 1 or 3");
-    }
-  }
-
   public static XAttr convertXAttr(XAttrProto a) {
     XAttr.Builder builder = new XAttr.Builder();
     builder.setNameSpace(convert(a.getNamespace()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8a99359..64c7987 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -45,6 +45,22 @@ public final class OzoneConfigKeys {
   public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
       false;
 
+  /**
+   * Ratis Port where containers listen to.
+   */
+  public static final String DFS_CONTAINER_RATIS_IPC_PORT =
+      "dfs.container.ratis.ipc";
+  public static final int DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT = 50012;
+
+  /**
+   * When set to true, allocate a random free port for ozone container, so that
+   * a mini cluster is able to launch multiple containers on a node.
+   */
+  public static final String DFS_CONTAINER_RATIS_IPC_RANDOM_PORT =
+      "dfs.container.ratis.ipc.random.port";
+  public static final boolean DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT =
+      false;
+
   public static final String OZONE_LOCALSTORAGE_ROOT =
       "ozone.localstorage.root";
   public static final String OZONE_LOCALSTORAGE_ROOT_DEFAULT = "/tmp/ozone";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 5e73045..68f1e09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -123,6 +123,8 @@ public final class OzoneConsts {
    */
   public static final int MAX_LISTVOLUMES_SIZE = 1024;
 
+  public static final int INVALID_PORT = -1;
+
   private OzoneConsts() {
     // Never Constructed
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
index b0e4e4e..508c004 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
@@ -31,6 +31,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import static org.apache.hadoop.scm.ScmConfigKeys
@@ -164,4 +165,36 @@ public class XceiverClientManager implements Closeable {
     clientCache.invalidateAll();
     clientCache.cleanUp();
   }
+
+  /**
+   * Tells us if Ratis is enabled for this cluster.
+   * @return True if Ratis is enabled.
+   */
+  public boolean isUseRatis() {
+    return useRatis;
+  }
+
+  /**
+   * Returns hard coded 3 as replication factor.
+   * @return 3
+   */
+  public  OzoneProtos.ReplicationFactor getFactor() {
+    if(isUseRatis()) {
+      return OzoneProtos.ReplicationFactor.THREE;
+    }
+    return OzoneProtos.ReplicationFactor.ONE;
+  }
+
+  /**
+   * Returns the default replication type.
+   * @return Ratis or Standalone
+   */
+  public OzoneProtos.ReplicationType getType() {
+    // TODO : Fix me and make Ratis default before release.
+    if(isUseRatis()) {
+      return OzoneProtos.ReplicationType.RATIS;
+    }
+    return OzoneProtos.ReplicationType.STAND_ALONE;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
index 5ee70bc..a90cff4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
@@ -72,10 +72,7 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   /**
-   * Create a container with the given ID as its name.
-   * @param containerId - String container ID
-   * @return A Pipeline object to actually write/read from the container.
-   * @throws IOException
+   * @inheritDoc
    */
   @Override
   public Pipeline createContainer(String containerId)
@@ -83,7 +80,10 @@ public class ContainerOperationClient implements ScmClient {
     XceiverClientSpi client = null;
     try {
       Pipeline pipeline =
-          storageContainerLocationClient.allocateContainer(containerId);
+          storageContainerLocationClient.allocateContainer(
+              xceiverClientManager.getType(),
+              xceiverClientManager.getFactor(), containerId);
+
       client = xceiverClientManager.acquireClient(pipeline);
       String traceID = UUID.randomUUID().toString();
       ContainerProtocolCalls.createContainer(client, traceID);
@@ -101,21 +101,18 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   /**
-   * Creates a Container on SCM with specified replication factor.
-   * @param containerId - String container ID
-   * @param replicationFactor - replication factor
-   * @return Pipeline
-   * @throws IOException
+   * @inheritDoc
    */
   @Override
-  public Pipeline createContainer(String containerId,
-      ScmClient.ReplicationFactor replicationFactor) throws IOException {
+  public Pipeline createContainer(OzoneProtos.ReplicationType type,
+      OzoneProtos.ReplicationFactor factor,
+      String containerId) throws IOException {
     XceiverClientSpi client = null;
     try {
       // allocate container on SCM.
       Pipeline pipeline =
-          storageContainerLocationClient.allocateContainer(containerId,
-              replicationFactor);
+          storageContainerLocationClient.allocateContainer(type, factor,
+              containerId);
       // connect to pipeline leader and allocate container on leader datanode.
       client = xceiverClientManager.acquireClient(pipeline);
       String traceID = UUID.randomUUID().toString();
@@ -123,7 +120,7 @@ public class ContainerOperationClient implements ScmClient {
       LOG.info("Created container " + containerId +
           " leader:" + pipeline.getLeader() +
           " machines:" + pipeline.getMachines() +
-          " replication factor:" + replicationFactor.getValue());
+          " replication factor:" + factor);
       return pipeline;
     } finally {
       if (client != null) {
@@ -150,6 +147,17 @@ public class ContainerOperationClient implements ScmClient {
   }
 
   /**
+   * Creates a specified replication pipeline.
+   */
+  @Override
+  public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
+      OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
+      throws IOException {
+    return storageContainerLocationClient.createReplicationPipeline(type,
+        factor, nodePool);
+  }
+
+  /**
    * Delete the container, this will release any resource it uses.
    * @param pipeline - Pipeline that represents the container.
    * @param force - True to forcibly delete the container.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java
index c651a8b..2c2d244 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ScmClient.java
@@ -95,41 +95,16 @@ public interface ScmClient {
   long getContainerSize(Pipeline pipeline) throws IOException;
 
   /**
-   * Replication factors supported by Ozone and SCM.
-   */
-  enum ReplicationFactor{
-    ONE(1),
-    THREE(3);
-
-    private final int value;
-    ReplicationFactor(int value) {
-      this.value = value;
-    }
-
-    public int getValue() {
-      return value;
-    }
-
-    public static ReplicationFactor parseReplicationFactor(int i) {
-      switch (i) {
-      case 1: return ONE;
-      case 3: return THREE;
-      default:
-        throw new IllegalArgumentException("Only replication factor 1 or 3" +
-            " is supported by Ozone/SCM.");
-      }
-    }
-  }
-
-  /**
    * Creates a Container on SCM and returns the pipeline.
-   * @param containerId - String container ID
-   * @param replicationFactor - replication factor (only 1/3 is supported)
+   * @param type - Replication Type.
+   * @param replicationFactor - Replication Factor
+   * @param containerId - Container ID
    * @return Pipeline
-   * @throws IOException
+   * @throws IOException - in case of error.
    */
-  Pipeline createContainer(String containerId,
-      ReplicationFactor replicationFactor) throws IOException;
+  Pipeline createContainer(OzoneProtos.ReplicationType type,
+      OzoneProtos.ReplicationFactor replicationFactor, String containerId)
+      throws IOException;
 
   /**
    * Returns a set of Nodes that meet a query criteria.
@@ -141,4 +116,15 @@ public interface ScmClient {
    */
   OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses,
       OzoneProtos.QueryScope queryScope, String poolName) throws IOException;
+
+  /**
+   * Creates a specified replication pipeline.
+   * @param type - Type
+   * @param factor - Replication factor
+   * @param nodePool - Set of machines.
+   * @throws IOException
+   */
+  Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
+      OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
index 6bb5800..ea0893e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
@@ -1,19 +1,18 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
 
 package org.apache.hadoop.scm.protocol;
@@ -22,7 +21,6 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
 
-import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 
@@ -31,26 +29,14 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
  * that currently host a container.
  */
 public interface StorageContainerLocationProtocol {
-
-  /**
-   * Asks SCM where a container should be allocated. SCM responds with the
-   * set of datanodes that should be used creating this container.
-   * @param containerName - Name of the container.
-   * @return Pipeline.
-   * @throws IOException
-   */
-  Pipeline allocateContainer(String containerName) throws IOException;
-
   /**
    * Asks SCM where a container should be allocated. SCM responds with the
    * set of datanodes that should be used creating this container.
-   * @param containerName - Name of the container.
-   * @param replicationFactor - replication factor.
-   * @return Pipeline.
-   * @throws IOException
+   *
    */
-  Pipeline allocateContainer(String containerName,
-      ScmClient.ReplicationFactor replicationFactor) throws IOException;
+  Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
+      OzoneProtos.ReplicationFactor factor,
+      String containerName) throws IOException;
 
   /**
    * Ask SCM the location of the container. SCM responds with a group of
@@ -99,4 +85,15 @@ public interface StorageContainerLocationProtocol {
   OzoneProtos.NodePool queryNode(EnumSet<OzoneProtos.NodeState> nodeStatuses,
       OzoneProtos.QueryScope queryScope, String poolName) throws IOException;
 
+  /**
+   * Creates a replication pipeline of a specified type.
+   * @param type - replication type
+   * @param factor - factor 1 or 3
+   * @param nodePool - optional machine list to build a pipeline.
+   * @throws IOException
+   */
+  Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
+      OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
+      throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 7ec4a86..93cd0cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -21,12 +21,10 @@ import com.google.common.base.Strings;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
@@ -37,6 +35,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
 
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
@@ -74,37 +74,27 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
 
   /**
    * Asks SCM where a container should be allocated. SCM responds with the set
-   * of datanodes that should be used creating this container.
-   *
-   * @param containerName - Name of the container.
-   * @return Pipeline.
-   * @throws IOException
-   */
-  @Override
-  public Pipeline allocateContainer(String containerName) throws IOException {
-    return allocateContainer(containerName, ScmClient.ReplicationFactor.ONE);
-  }
-
-  /**
-   * Asks SCM where a container should be allocated. SCM responds with the set
    * of datanodes that should be used creating this container. Ozone/SCM only
    * supports replication factor of either 1 or 3.
-   *
-   * @param containerName - Name of the container.
-   * @param replicationFactor - replication factor.
-   * @return Pipeline.
+   * @param type - Replication Type
+   * @param factor - Replication Count
+   * @param containerName - Name
+   * @return
    * @throws IOException
    */
   @Override
-  public Pipeline allocateContainer(String containerName,
-      ScmClient.ReplicationFactor replicationFactor) throws IOException {
+  public Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+      OzoneProtos.ReplicationFactor factor, String
+      containerName) throws IOException {
 
     Preconditions.checkNotNull(containerName, "Container Name cannot be Null");
     Preconditions.checkState(!containerName.isEmpty(), "Container name cannot" +
         " be empty");
     ContainerRequestProto request = ContainerRequestProto.newBuilder()
-        .setContainerName(containerName).setReplicationFactor(PBHelperClient
-            .convertReplicationFactor(replicationFactor)).build();
+        .setContainerName(containerName)
+        .setReplicationFactor(factor)
+        .setReplicationType(type)
+        .build();
 
     final ContainerResponseProto response;
     try {
@@ -217,6 +207,42 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
 
   }
 
+  /**
+   * Creates a replication pipeline of a specified type.
+   *
+   * @param replicationType - replication type
+   * @param factor - factor 1 or 3
+   * @param nodePool - optional machine list to build a pipeline.
+   * @throws IOException
+   */
+  @Override
+  public Pipeline createReplicationPipeline(OzoneProtos.ReplicationType
+      replicationType, OzoneProtos.ReplicationFactor factor, OzoneProtos
+      .NodePool nodePool) throws IOException {
+    PipelineRequestProto request = PipelineRequestProto.newBuilder()
+        .setNodePool(nodePool)
+        .setReplicationFactor(factor)
+        .setReplicationType(replicationType)
+        .build();
+    try {
+      PipelineResponseProto response =
+          rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request);
+      if (response.getErrorCode() ==
+          PipelineResponseProto.Error.success) {
+        Preconditions.checkState(response.hasPipeline(), "With success, " +
+            "must come a pipeline");
+        return Pipeline.getFromProtoBuf(response.getPipeline());
+      } else {
+        String errorMessage = String.format("create replication pipeline " +
+                "failed. code : %s Message: %s", response.getErrorCode(),
+            response.hasErrorMessage() ? response.getErrorMessage() : "");
+        throw new IOException(errorMessage);
+      }
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
   @Override
   public Object getUnderlyingProxyObject() {
     return rpcProxy;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
index bedd9a8..a2acdff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/ratis/RatisHelper.java
@@ -41,7 +41,7 @@ public interface RatisHelper {
   Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
 
   static String toRaftPeerIdString(DatanodeID id) {
-    return id.getIpAddr() + ":" + id.getContainerPort();
+    return id.getIpAddr() + ":" + id.getRatisPort();
   }
 
   static RaftPeerId toRaftPeerId(DatanodeID id) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
index ede6ea9..50926c2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
@@ -35,6 +35,7 @@ message Pipeline {
     required string leaderID = 1;
     repeated DatanodeIDProto members = 2;
     required string containerName = 3;
+    optional LifeCycleStates state = 4 [default = OPERATIONAL];
 }
 
 message KeyValue {
@@ -71,3 +72,22 @@ message NodePool {
     repeated Node nodes = 1;
 }
 
+
+enum ReplicationType {
+    RATIS = 1;
+    STAND_ALONE = 2;
+    CHAINED = 3;
+}
+
+
+enum ReplicationFactor {
+    ONE = 1;
+    THREE = 3;
+}
+
+enum LifeCycleStates {
+    CLIENT_CREATE = 1; // A request to client to create this object
+    OPERATIONAL = 2; // Mostly an update to SCM via HB or client call.
+    TIMED_OUT = 3; // creation has timed out from SCM's View.
+    DELETED = 4; // object is deleted.
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
index 6c16347..30c7166 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
@@ -37,11 +37,9 @@ import "Ozone.proto";
 message ContainerRequestProto {
   required string containerName = 1;
   // Ozone only support replciation of either 1 or 3.
-  enum ReplicationFactor {
-    ONE = 1;
-    THREE = 3;
-  }
-  required ReplicationFactor replicationFactor = 2;
+  required hadoop.hdfs.ozone.ReplicationFactor replicationFactor = 2;
+  required hadoop.hdfs.ozone.ReplicationType  replicationType = 3;
+
 }
 
 /**
@@ -111,6 +109,28 @@ message NodeQueryResponseProto {
   required hadoop.hdfs.ozone.NodePool datanodes = 1;
 }
 
+/**
+  Request to create a replication pipeline.
+ */
+message PipelineRequestProto {
+  required hadoop.hdfs.ozone.ReplicationType replicationType = 1;
+  required hadoop.hdfs.ozone.ReplicationFactor replicationFactor = 2;
+
+  // if datanodes are specified then pipelines are created using those
+  // datanodes.
+  optional hadoop.hdfs.ozone.NodePool nodePool = 3;
+  optional string pipelineID = 4;
+}
+
+message  PipelineResponseProto {
+  enum Error {
+    success = 1;
+    errorPipelineAlreadyExists = 2;
+  }
+  required Error errorCode = 1;
+  optional hadoop.hdfs.ozone.Pipeline  pipeline = 2;
+  optional string errorMessage = 3;
+}
 
 /**
  * Protocol used from an HDFS node to StorageContainerManager.  See the request
@@ -139,4 +159,21 @@ service StorageContainerLocationProtocolService {
   * Returns a set of Nodes that meet a criteria.
   */
   rpc queryNode(NodeQueryRequestProto)  returns (NodeQueryResponseProto);
+
+ /*
+  *  Apis that Manage Pipelines.
+  *
+  * Pipelines are abstractions offered by SCM and Datanode that allows users
+  * to create a replication pipeline.
+  *
+  *  These following APIs allow command line programs like SCM CLI to list
+  * and manage pipelines.
+  */
+
+  /**
+  *  Creates a replication pipeline.
+  */
+  rpc allocatePipeline(PipelineRequestProto)
+      returns (PipelineResponseProto);
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index 06d6802..497d734 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -58,7 +58,8 @@ message DatanodeIDProto {
   required uint32 infoPort = 5;  // datanode http port
   required uint32 ipcPort = 6;   // ipc server port
   optional uint32 infoSecurePort = 7 [default = 0]; // datanode https port
-  optional uint32 containerPort = 8 [default = 0]; // Ozone container protocol
+  optional uint32 containerPort = 8 [default = 0]; // Ozone stand_alone protocol
+  optional uint32 ratisPort = 9 [default = 0]; //Ozone ratis port
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
index 368c250..1f22aa8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/cblock/storage/StorageManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.cblock.meta.VolumeInfo;
 import org.apache.hadoop.cblock.proto.MountVolumeResponse;
 import org.apache.hadoop.cblock.util.KeyUtil;
 import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.slf4j.Logger;
@@ -179,9 +180,10 @@ public class StorageManager {
       long allocatedSize = 0;
       ArrayList<String> containerIds = new ArrayList<>();
       while (allocatedSize < volumeSize) {
-        Pipeline pipeline = storageClient.createContainer(
-            KeyUtil.getContainerName(userName, volumeName, containerIdx),
-            ScmClient.ReplicationFactor.ONE);
+        Pipeline pipeline = storageClient.createContainer(OzoneProtos
+                .ReplicationType.STAND_ALONE,
+            OzoneProtos.ReplicationFactor.ONE,
+            KeyUtil.getContainerName(userName, volumeName, containerIdx));
         ContainerDescriptor container =
             new ContainerDescriptor(pipeline.getContainerName());
         container.setPipeline(pipeline);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 5e0a656..66992af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
@@ -72,7 +73,7 @@ public class DatanodeStateMachine implements Closeable {
     context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
     heartbeatFrequency = TimeUnit.SECONDS.toMillis(
         OzoneClientUtils.getScmHeartbeatInterval(conf));
-    container = new OzoneContainer(conf);
+    container = new OzoneContainer(datanodeID, new OzoneConfiguration(conf));
     this.datanodeID = datanodeID;
     nextHB = new AtomicLong(Time.monotonicNow());
 
@@ -87,11 +88,6 @@ public class DatanodeStateMachine implements Closeable {
       .build();
   }
 
-  public DatanodeStateMachine(Configuration conf)
-      throws IOException {
-    this(null, conf);
-  }
-
   public void setDatanodeID(DatanodeID datanodeID) {
     this.datanodeID = datanodeID;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index eb2ca3a..994b245 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
 import static org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ReportState.states
     .noContainerReports;
@@ -90,7 +91,16 @@ public class StateContext {
    */
   public int getContainerPort() {
     return parent == null ?
-        -1 : parent.getContainer().getContainerServerPort();
+        INVALID_PORT : parent.getContainer().getContainerServerPort();
+  }
+
+  /**
+   * Gets the Ratis Port.
+   * @return int , return -1 if not valid.
+   */
+  public int getRatisPort() {
+    return parent == null ?
+        INVALID_PORT : parent.getContainer().getRatisContainerServerPort();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
index d35f64d..0552a2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/datanode/InitDatanodeState.java
@@ -113,9 +113,11 @@ public class InitDatanodeState implements DatanodeState,
     }
     File idPath = new File(dataNodeIDPath);
     int containerPort = this.context.getContainerPort();
+    int ratisPort = this.context.getRatisPort();
     DatanodeID datanodeID = this.context.getParent().getDatanodeID();
     if (datanodeID != null) {
       datanodeID.setContainerPort(containerPort);
+      datanodeID.setRatisPort(ratisPort);
       ContainerUtils.writeDatanodeIDTo(datanodeID, idPath);
       LOG.info("Datanode ID is persisted to {}", dataNodeIDPath);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
index 3a6e672..7271cb2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
@@ -29,6 +29,7 @@ import io.netty.handler.logging.LoggingHandler;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,6 +82,16 @@ public final class XceiverServer implements XceiverServerSpi {
     return this.port;
   }
 
+  /**
+   * Returns the Replication type supported by this end-point.
+   *
+   * @return enum -- {Stand_Alone, Ratis, Chained}
+   */
+  @Override
+  public OzoneProtos.ReplicationType getServerType() {
+    return OzoneProtos.ReplicationType.STAND_ALONE;
+  }
+
   @Override
   public void start() throws IOException {
     bossGroup = new NioEventLoopGroup();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index e5b0497..b61d7fd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server;
 
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+
 import java.io.IOException;
 
 /** A server endpoint that acts as the communication layer for Ozone
@@ -31,4 +33,11 @@ public interface XceiverServerSpi {
 
   /** Get server IPC port. */
   int getIPCPort();
+
+  /**
+   * Returns the Replication type supported by this end-point.
+   * @return enum -- {Stand_Alone, Ratis, Chained}
+   */
+  OzoneProtos.ReplicationType getServerType();
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 69f3801..ba613c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -18,10 +18,15 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
@@ -34,7 +39,9 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.util.Collections;
 import java.util.Objects;
 
@@ -44,6 +51,22 @@ import java.util.Objects;
  */
 public final class XceiverServerRatis implements XceiverServerSpi {
   static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
+  private final int port;
+  private final RaftServer server;
+
+  private XceiverServerRatis(
+      String id, int port, String storageDir,
+      ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
+    Objects.requireNonNull(id, "id == null");
+    this.port = port;
+
+    this.server = RaftServer.newBuilder()
+        .setServerId(RaftPeerId.valueOf(id))
+        .setPeers(Collections.emptyList())
+        .setProperties(newRaftProperties(rpcType, port, storageDir))
+        .setStateMachine(new ContainerStateMachine(dispatcher))
+        .build();
+  }
 
   static RaftProperties newRaftProperties(
       RpcType rpc, int port, String storageDir) {
@@ -52,44 +75,60 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     RaftConfigKeys.Rpc.setType(properties, rpc);
     if (rpc == SupportedRpcType.GRPC) {
       GrpcConfigKeys.Server.setPort(properties, port);
-    } else if (rpc == SupportedRpcType.NETTY) {
-      NettyConfigKeys.Server.setPort(properties, port);
+    } else {
+      if (rpc == SupportedRpcType.NETTY) {
+        NettyConfigKeys.Server.setPort(properties, port);
+      }
     }
     return properties;
   }
 
-  public static XceiverServerRatis newXceiverServerRatis(
+  public static XceiverServerRatis newXceiverServerRatis(String datanodeID,
       Configuration ozoneConf, ContainerDispatcher dispatcher)
       throws IOException {
-    final String id = ozoneConf.get(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_ID);
-    final int port = ozoneConf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
-        OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
-    final String storageDir = ozoneConf.get(
+    final String ratisDir = File.separator + "ratis";
+    int localPort = ozoneConf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
+    String storageDir = ozoneConf.get(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR);
+
+    if (Strings.isNullOrEmpty(storageDir)) {
+      storageDir = ozoneConf.get(OzoneConfigKeys
+          .OZONE_CONTAINER_METADATA_DIRS);
+      Preconditions.checkNotNull(storageDir, "ozone.container.metadata.dirs " +
+          "cannot be null, Please check your configs.");
+      storageDir = storageDir.concat(ratisDir);
+      LOG.warn("Storage directory for Ratis is not configured. Mapping Ratis " +
+              "storage under {}. It is a good idea to map this to an SSD disk.",
+          storageDir);
+    }
     final String rpcType = ozoneConf.get(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
     final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
-    return new XceiverServerRatis(id, port, storageDir, dispatcher, rpc);
-  }
 
-  private final int port;
-  private final RaftServer server;
-
-  private XceiverServerRatis(
-      String id, int port, String storageDir,
-      ContainerDispatcher dispatcher, RpcType rpcType) throws IOException {
-    Objects.requireNonNull(id, "id == null");
-    this.port = port;
-
-    this.server = RaftServer.newBuilder()
-        .setServerId(RaftPeerId.valueOf(id))
-        .setPeers(Collections.emptyList())
-        .setProperties(newRaftProperties(rpcType, port, storageDir))
-        .setStateMachine(new ContainerStateMachine(dispatcher))
-        .build();
+    // Get an available port on current node and
+    // use that as the container port
+    if (ozoneConf.getBoolean(OzoneConfigKeys
+            .DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
+      try (ServerSocket socket = new ServerSocket(0)) {
+        socket.setReuseAddress(true);
+        localPort = socket.getLocalPort();
+        LOG.info("Found a free port for the server : {}", localPort);
+        // If we have random local ports configured this means that it
+        // probably running under MiniOzoneCluster. Ratis locks the storage
+        // directories, so we need to pass different local directory for each
+        // local instance. So we map ratis directories under datanode ID.
+        storageDir = storageDir.concat(File.separator + datanodeID);
+      } catch (IOException e) {
+        LOG.error("Unable find a random free port for the server, "
+            + "fallback to use default port {}", localPort, e);
+      }
+    }
+    return new XceiverServerRatis(datanodeID, localPort, storageDir,
+        dispatcher, rpc);
   }
 
   @Override
@@ -112,4 +151,14 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   public int getIPCPort() {
     return port;
   }
+
+  /**
+   * Returns the Replication type supported by this end-point.
+   *
+   * @return enum -- {Stand_Alone, Ratis, Chained}
+   */
+  @Override
+  public OzoneProtos.ReplicationType getServerType() {
+    return OzoneProtos.ReplicationType.RATIS;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 792c132..307f59f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.ozoneimpl;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
@@ -33,6 +34,8 @@ import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
 import org.apache.hadoop.ozone.container.common.statemachine.background.BlockDeletingService;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServer;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
 import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
@@ -50,6 +53,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
 
 /**
  * Ozone main class sets up the network server and initializes the container
@@ -62,7 +66,7 @@ public class OzoneContainer {
   private final Configuration ozoneConfig;
   private final ContainerDispatcher dispatcher;
   private final ContainerManager manager;
-  private final XceiverServerSpi server;
+  private final XceiverServerSpi[] server;
   private final ChunkManager chunkManager;
   private final KeyManager keyManager;
   private final BlockDeletingService blockDeletingService;
@@ -73,8 +77,8 @@ public class OzoneContainer {
    * @param ozoneConfig - Config
    * @throws IOException
    */
-  public OzoneContainer(
-      Configuration ozoneConfig) throws IOException {
+  public OzoneContainer(DatanodeID datanodeID, Configuration ozoneConfig) throws
+      IOException {
     this.ozoneConfig = ozoneConfig;
     List<StorageLocation> locations = new LinkedList<>();
     String[] paths = ozoneConfig.getStrings(
@@ -104,12 +108,11 @@ public class OzoneContainer {
 
     this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
 
-    final boolean useRatis = ozoneConfig.getBoolean(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
-    server = useRatis?
-        XceiverServerRatis.newXceiverServerRatis(ozoneConfig, dispatcher)
-        : new XceiverServer(this.ozoneConfig, this.dispatcher);
+    server = new XceiverServerSpi[]{
+        new XceiverServer(this.ozoneConfig, this.dispatcher),
+      XceiverServerRatis.newXceiverServerRatis(datanodeID
+          .getDatanodeUuid().toString(), ozoneConfig, dispatcher)
+    };
   }
 
   /**
@@ -118,7 +121,9 @@ public class OzoneContainer {
    * @throws IOException
    */
   public void start() throws IOException {
-    server.start();
+    for (XceiverServerSpi serverinstance : server) {
+      serverinstance.start();
+    }
     blockDeletingService.start();
     dispatcher.init();
   }
@@ -157,7 +162,9 @@ public class OzoneContainer {
    */
   public void stop() {
     LOG.info("Attempting to stop container services.");
-    server.stop();
+    for(XceiverServerSpi serverinstance: server) {
+      serverinstance.stop();
+    }
     dispatcher.shutdown();
 
     try {
@@ -194,13 +201,31 @@ public class OzoneContainer {
     return this.manager.getNodeReport();
   }
 
+  private int getPortbyType(OzoneProtos.ReplicationType replicationType) {
+    for (XceiverServerSpi serverinstance : server) {
+      if (serverinstance.getServerType() == replicationType) {
+        return serverinstance.getIPCPort();
+      }
+    }
+    return INVALID_PORT;
+  }
+
   /**
    * Returns the container server IPC port.
    *
    * @return Container server IPC port.
    */
   public int getContainerServerPort() {
-    return server.getIPCPort();
+    return getPortbyType(OzoneProtos.ReplicationType.STAND_ALONE);
+  }
+
+  /**
+   * Returns the Ratis container Server IPC port.
+   *
+   * @return Ratis port.
+   */
+  public int getRatisContainerServerPort() {
+    return getPortbyType(OzoneProtos.ReplicationType.RATIS);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index e45afb9..628de42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -30,22 +31,17 @@ import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerLocationProtocolProtos;
 import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
 
-import static org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerLocationProtocolProtos.ContainerRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerLocationProtocolProtos.ContainerResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerLocationProtocolProtos.GetContainerRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerLocationProtocolProtos.GetContainerResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerLocationProtocolProtos.DeleteContainerRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerLocationProtocolProtos.DeleteContainerResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerLocationProtocolProtos.ListContainerResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerLocationProtocolProtos.ListContainerRequestProto;
+import static org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
+
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
 
@@ -74,7 +70,8 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
   public ContainerResponseProto allocateContainer(RpcController unused,
       ContainerRequestProto request) throws ServiceException {
     try {
-      Pipeline pipeline = impl.allocateContainer(request.getContainerName());
+      Pipeline pipeline = impl.allocateContainer(request.getReplicationType(),
+          request.getReplicationFactor(), request.getContainerName());
       return ContainerResponseProto.newBuilder()
           .setPipeline(pipeline.getProtobufMessage())
           .setErrorCode(ContainerResponseProto.Error.success)
@@ -161,4 +158,12 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public PipelineResponseProto allocatePipeline(
+      RpcController controller, PipelineRequestProto request)
+      throws ServiceException {
+    // TODO : Wiring this up requires one more patch.
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index e8cc8f0..e320983 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -39,41 +39,24 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.Type;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerLocationProtocolProtos;
-import org.apache.hadoop.ozone.protocolPB
-    .ScmBlockLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.scm.block.BlockManager;
 import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
 import org.apache.hadoop.ozone.scm.container.ContainerMapping;
@@ -81,7 +64,6 @@ import org.apache.hadoop.ozone.scm.container.Mapping;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
-import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@@ -386,21 +368,6 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
   }
 
   /**
-   * Asks SCM where a container should be allocated. SCM responds with the set
-   * of datanodes that should be used creating this container.
-   *
-   * @param containerName - Name of the container.
-   * @return Pipeline.
-   * @throws IOException
-   */
-  @Override
-  public Pipeline allocateContainer(String containerName) throws IOException {
-    checkAdminAccess();
-    return scmContainerManager.allocateContainer(containerName,
-        ScmClient.ReplicationFactor.ONE);
-  }
-
-  /**
    * {@inheritDoc}
    */
   @Override
@@ -458,6 +425,19 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
   }
 
   /**
+   * Creates a replication pipeline of a specified type.
+   */
+  @Override
+  public Pipeline createReplicationPipeline(
+      OzoneProtos.ReplicationType replicationType,
+      OzoneProtos.ReplicationFactor factor,
+      OzoneProtos.NodePool nodePool)
+      throws IOException {
+     // TODO: will be addressed in future patch.
+    return null;
+  }
+
+  /**
    * Queries a list of Node that match a set of statuses.
    * <p>
    * For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER,
@@ -527,11 +507,12 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
    * @throws IOException
    */
   @Override
-  public Pipeline allocateContainer(String containerName,
-      ScmClient.ReplicationFactor replicationFactor) throws IOException {
+  public Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
+      OzoneProtos.ReplicationFactor replicationFactor, String containerName)
+      throws IOException {
     checkAdminAccess();
-    return scmContainerManager.allocateContainer(containerName,
-        replicationFactor);
+    return scmContainerManager.allocateContainer(replicationType,
+        replicationFactor, containerName);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 0eb60e4..e000ccc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.scm.container.Mapping;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
@@ -177,7 +178,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
       for (int i = 0; i < count; i++) {
         String containerName = UUID.randomUUID().toString();
         try {
-          Pipeline pipeline = containerManager.allocateContainer(containerName);
+          // TODO: Fix this later when Ratis is made the Default.
+          Pipeline pipeline = containerManager.allocateContainer(
+              OzoneProtos.ReplicationType.STAND_ALONE,
+              OzoneProtos.ReplicationFactor.ONE,
+              containerName);
+
           if (pipeline == null) {
             LOG.warn("Unable to allocate container.");
             continue;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index 643779d..8daa5d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
@@ -22,15 +21,11 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
-import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.hadoop.scm.client.ScmClient;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
@@ -41,8 +36,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
@@ -65,8 +58,7 @@ public class ContainerMapping implements Mapping {
   private final Lock lock;
   private final Charset encoding = Charset.forName("UTF-8");
   private final MetadataStore containerStore;
-  private final ContainerPlacementPolicy placementPolicy;
-  private final long containerSize;
+  private final PipelineSelector pipelineSelector;
 
   /**
    * Constructs a mapping class that creates mapping between container names and
@@ -96,66 +88,10 @@ public class ContainerMapping implements Mapping {
         .build();
 
     this.lock = new ReentrantLock();
-
-    this.containerSize = OzoneConsts.GB * conf.getInt(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
-    this.placementPolicy = createContainerPlacementPolicy(nodeManager, conf);
+    this.pipelineSelector = new PipelineSelector(nodeManager, conf);
   }
 
-  /**
-   * Create pluggable container placement policy implementation instance.
-   *
-   * @param nodeManager - SCM node manager.
-   * @param conf - configuration.
-   * @return SCM container placement policy implementation instance.
-   */
-  @SuppressWarnings("unchecked")
-  private static ContainerPlacementPolicy createContainerPlacementPolicy(
-      final NodeManager nodeManager, final Configuration conf) {
-    Class<? extends ContainerPlacementPolicy> implClass =
-        (Class<? extends ContainerPlacementPolicy>) conf.getClass(
-            ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-            SCMContainerPlacementRandom.class);
 
-    try {
-      Constructor<? extends ContainerPlacementPolicy> ctor =
-          implClass.getDeclaredConstructor(NodeManager.class,
-              Configuration.class);
-      return ctor.newInstance(nodeManager, conf);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (InvocationTargetException e) {
-      throw new RuntimeException(implClass.getName()
-          + " could not be constructed.", e.getCause());
-    } catch (Exception e) {
-      LOG.error("Unhandled exception occured, Placement policy will not be " +
-          "functional.");
-      throw new IllegalArgumentException("Unable to load " +
-          "ContainerPlacementPolicy", e);
-    }
-  }
-
-  /**
-   * Translates a list of nodes, ordered such that the first is the leader, into
-   * a corresponding {@link Pipeline} object.
-   * @param nodes - list of datanodes on which we will allocate the container.
-   * The first of the list will be the leader node.
-   * @param containerName container name
-   * @return pipeline corresponding to nodes
-   */
-  private static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes,
-      final String containerName) {
-    Preconditions.checkNotNull(nodes);
-    Preconditions.checkArgument(nodes.size() > 0);
-    String leaderId = nodes.get(0).getDatanodeUuid();
-    Pipeline pipeline = new Pipeline(leaderId);
-    for (DatanodeID node : nodes) {
-      pipeline.addMember(node);
-    }
-    pipeline.setContainerName(containerName);
-    return pipeline;
-  }
 
   /**
    * Returns the Pipeline from the container name.
@@ -192,7 +128,7 @@ public class ContainerMapping implements Mapping {
     List<Pipeline> pipelineList = new ArrayList<>();
     lock.lock();
     try {
-      if(containerStore.isEmpty()) {
+      if (containerStore.isEmpty()) {
         throw new IOException("No container exists in current db");
       }
       MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName);
@@ -217,26 +153,14 @@ public class ContainerMapping implements Mapping {
    * Allocates a new container.
    *
    * @param containerName - Name of the container.
-   * @return - Pipeline that makes up this container.
-   * @throws IOException
-   */
-  @Override
-  public Pipeline allocateContainer(final String containerName)
-      throws IOException {
-    return allocateContainer(containerName, ScmClient.ReplicationFactor.ONE);
-  }
-
-  /**
-   * Allocates a new container.
-   *
-   * @param containerName - Name of the container.
    * @param replicationFactor - replication factor of the container.
    * @return - Pipeline that makes up this container.
-   * @throws IOException
+   * @throws IOException - Exception
    */
   @Override
-  public Pipeline allocateContainer(final String containerName,
-      final ScmClient.ReplicationFactor replicationFactor) throws IOException {
+  public Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+      OzoneProtos.ReplicationFactor replicationFactor,
+      final String containerName) throws IOException {
     Preconditions.checkNotNull(containerName);
     Preconditions.checkState(!containerName.isEmpty());
     Pipeline pipeline = null;
@@ -253,18 +177,10 @@ public class ContainerMapping implements Mapping {
         throw new SCMException("Specified container already exists. key : " +
             containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
       }
-      List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
-          replicationFactor.getValue(), containerSize);
-      // TODO: handle under replicated container
-      if (datanodes != null && datanodes.size() > 0) {
-        pipeline = newPipelineFromNodes(datanodes, containerName);
-        containerStore.put(containerName.getBytes(encoding),
-            pipeline.getProtobufMessage().toByteArray());
-      } else {
-        LOG.debug("Unable to find enough datanodes for new container. " +
-            "Required {} found {}", replicationFactor,
-            datanodes != null ? datanodes.size(): 0);
-      }
+      pipeline = pipelineSelector.getReplicationPipeline(type,
+          replicationFactor, containerName);
+      containerStore.put(containerName.getBytes(encoding),
+          pipeline.getProtobufMessage().toByteArray());
     } finally {
       lock.unlock();
     }
@@ -275,9 +191,8 @@ public class ContainerMapping implements Mapping {
    * Deletes a container from SCM.
    *
    * @param containerName - Container name
-   * @throws IOException
-   *   if container doesn't exist
-   *   or container store failed to delete the specified key.
+   * @throws IOException if container doesn't exist or container store failed to
+   *                     delete the specified key.
    */
   @Override
   public void deleteContainer(String containerName) throws IOException {
@@ -286,7 +201,7 @@ public class ContainerMapping implements Mapping {
       byte[] dbKey = containerName.getBytes(encoding);
       byte[] pipelineBytes =
           containerStore.get(dbKey);
-      if(pipelineBytes == null) {
+      if (pipelineBytes == null) {
         throw new SCMException("Failed to delete container "
             + containerName + ", reason : container doesn't exist.",
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
index 194158b..1ef3572 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
@@ -17,7 +17,7 @@
 package org.apache.hadoop.ozone.scm.container;
 
 
-import org.apache.hadoop.scm.client.ScmClient;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.Closeable;
@@ -57,14 +57,6 @@ public interface Mapping extends Closeable {
   List<Pipeline> listContainer(String startName, String prefixName, int count)
       throws IOException;
 
-  /**
-   * Allocates a new container for a given keyName.
-   *
-   * @param containerName - Name
-   * @return - Pipeline that makes up this container.
-   * @throws IOException
-   */
-  Pipeline allocateContainer(String containerName) throws IOException;
 
   /**
    * Allocates a new container for a given keyName and replication factor.
@@ -74,8 +66,9 @@ public interface Mapping extends Closeable {
    * @return - Pipeline that makes up this container.
    * @throws IOException
    */
-  Pipeline allocateContainer(String containerName,
-      ScmClient.ReplicationFactor replicationFactor) throws IOException;
+  Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+      OzoneProtos.ReplicationFactor replicationFactor,
+      String containerName) throws IOException;
 
   /**
    * Deletes a container from SCM.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a245c60b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
new file mode 100644
index 0000000..6293d84
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineManager.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.pipelines;
+
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Manage Ozone pipelines.
+ */
+public interface PipelineManager {
+
+  /**
+   * This function is called by the Container Manager while allocating a new
+   * container. The client specifies what kind of replication pipeline is
+   * needed and based on the replication type in the request appropriate
+   * Interface is invoked.
+   *
+   * @param containerName Name of the container
+   * @param replicationFactor - Replication Factor
+   * @return a Pipeline.
+   */
+  Pipeline getPipeline(String containerName,
+      OzoneProtos.ReplicationFactor replicationFactor) throws IOException;
+
+  /**
+   * Creates a pipeline from a specified set of Nodes.
+   * @param pipelineID - Name of the pipeline
+   * @param datanodes - The list of datanodes that make this pipeline.
+   */
+  void createPipeline(String pipelineID, List<DatanodeID> datanodes)
+      throws IOException;;
+
+  /**
+   * Close the  pipeline with the given clusterId.
+   */
+  void closePipeline(String pipelineID) throws IOException;
+
+  /**
+   * list members in the pipeline .
+   * @return the datanode
+   */
+  List<DatanodeID> getMembers(String pipelineID) throws IOException;
+
+  /**
+   * Update the datanode list of the pipeline.
+   */
+  void updatePipeline(String pipelineID, List<DatanodeID> newDatanodes)
+      throws IOException;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org