You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2017/08/23 19:39:14 UTC

hadoop git commit: HDFS-11888. Ozone: SCM: use state machine for open containers allocated for key/blocks. Contributed by Xiaoyu Yao.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 076bd5d64 -> 8333602a9


HDFS-11888. Ozone: SCM: use state machine for open containers allocated for key/blocks. Contributed by Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8333602a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8333602a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8333602a

Branch: refs/heads/HDFS-7240
Commit: 8333602a9967b3c598adc74544152dd99ea5869b
Parents: 076bd5d
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Wed Aug 23 12:37:09 2017 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Wed Aug 23 12:37:09 2017 -0700

----------------------------------------------------------------------
 .../ozone/client/io/ChunkGroupOutputStream.java |  43 ++--
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |   2 +-
 .../scm/client/ContainerOperationClient.java    |  19 ++
 .../common/helpers/BlockContainerInfo.java      |  45 ++++
 .../container/common/helpers/ContainerInfo.java | 109 +++++++++
 .../StorageContainerLocationProtocol.java       |  18 +-
 ...rLocationProtocolClientSideTranslatorPB.java |  28 ++-
 .../src/main/proto/Ozone.proto                  |  49 +++-
 .../StorageContainerLocationProtocol.proto      |  26 +-
 ...rLocationProtocolServerSideTranslatorPB.java |  18 +-
 .../ozone/scm/StorageContainerManager.java      |  36 ++-
 .../ozone/scm/block/BlockManagerImpl.java       | 241 ++++++++++++++-----
 .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java |  13 +-
 .../ozone/scm/container/ContainerMapping.java   | 150 ++++++++++--
 .../hadoop/ozone/scm/container/Mapping.java     |  22 +-
 .../ozone/scm/exceptions/SCMException.java      |   1 +
 .../StorageContainerDatanodeProtocol.proto      |   6 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java    |  27 ++-
 .../scm/container/TestContainerMapping.java     |  19 +-
 .../ozone/scm/node/TestContainerPlacement.java  |   2 +-
 20 files changed, 708 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 2cc12f4..ca81324 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.Result;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.XceiverClientSpi;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@@ -37,8 +38,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
 
 /**
  * Maintaining a list of ChunkInputStream. Write based on offset.
@@ -60,10 +59,6 @@ public class ChunkGroupOutputStream extends OutputStream {
   private long totalSize;
   private long byteOffset;
 
-  //This has to be removed once HDFS-11888 is resolved.
-  //local cache which will have list of created container names.
-  private static Set<String> containersCreated = new HashSet<>();
-
   public ChunkGroupOutputStream() {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
@@ -293,27 +288,21 @@ public class ChunkGroupOutputStream extends OutputStream {
       XceiverClientSpi xceiverClient =
           xceiverClientManager.acquireClient(pipeline);
       // create container if needed
-      // TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
-      //The following change has to reverted once HDFS-11888 is fixed.
-      if(!containersCreated.contains(containerName)) {
-        synchronized (containerName.intern()) {
-          //checking again, there is a chance that some other thread has
-          // created it.
-          if (!containersCreated.contains(containerName)) {
-            LOG.debug("Need to create container {}.", containerName);
-            try {
-              ContainerProtocolCalls.createContainer(xceiverClient, requestId);
-            } catch (StorageContainerException ex) {
-              if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
-                //container already exist.
-                LOG.debug("Container {} already exists.", containerName);
-              } else {
-                LOG.error("Container creation failed for {}.",
-                    containerName, ex);
-                throw ex;
-              }
-            }
-            containersCreated.add(containerName);
+      if (subKeyInfo.getShouldCreateContainer()) {
+        try {
+          // Block manager sets the container creation stage begin.
+          ContainerProtocolCalls.createContainer(xceiverClient, requestId);
+          storageContainerLocationClient.notifyObjectCreationStage(
+              NotifyObjectCreationStageRequestProto.Type.container,
+              containerName,
+              NotifyObjectCreationStageRequestProto.Stage.complete);
+        } catch (StorageContainerException ex) {
+          if (ex.getResult().equals(Result.CONTAINER_EXISTS)) {
+            //container already exist, this should never happen
+            LOG.debug("Container {} already exists.", containerName);
+          } else {
+            LOG.error("Container creation failed for {}.", containerName, ex);
+            throw ex;
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 73d81f8..0b081a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -187,7 +187,7 @@ public final class ScmConfigKeys {
 
   public static final String OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE =
       "ozone.scm.container.provision_batch_size";
-  public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 1;
+  public static final int OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT = 5;
 
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
index a90cff4..3c1a266 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/client/ContainerOperationClient.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.scm.client;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerData;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ReadContainerResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
 import org.apache.hadoop.scm.XceiverClientSpi;
 import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.scm.XceiverClientManager;
@@ -86,12 +87,20 @@ public class ContainerOperationClient implements ScmClient {
 
       client = xceiverClientManager.acquireClient(pipeline);
       String traceID = UUID.randomUUID().toString();
+      storageContainerLocationClient.notifyObjectCreationStage(
+          NotifyObjectCreationStageRequestProto.Type.container,
+          containerId,
+          NotifyObjectCreationStageRequestProto.Stage.begin);
       ContainerProtocolCalls.createContainer(client, traceID);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Created container " + containerId
             + " leader:" + pipeline.getLeader()
             + " machines:" + pipeline.getMachines());
       }
+      storageContainerLocationClient.notifyObjectCreationStage(
+          NotifyObjectCreationStageRequestProto.Type.container,
+          containerId,
+          NotifyObjectCreationStageRequestProto.Stage.complete);
       return pipeline;
     } finally {
       if (client != null) {
@@ -116,11 +125,21 @@ public class ContainerOperationClient implements ScmClient {
       // connect to pipeline leader and allocate container on leader datanode.
       client = xceiverClientManager.acquireClient(pipeline);
       String traceID = UUID.randomUUID().toString();
+      storageContainerLocationClient.notifyObjectCreationStage(
+          NotifyObjectCreationStageRequestProto.Type.container,
+          containerId,
+          NotifyObjectCreationStageRequestProto.Stage.begin);
+
       ContainerProtocolCalls.createContainer(client, traceID);
       LOG.info("Created container " + containerId +
           " leader:" + pipeline.getLeader() +
           " machines:" + pipeline.getMachines() +
           " replication factor:" + factor);
+
+      storageContainerLocationClient.notifyObjectCreationStage(
+          NotifyObjectCreationStageRequestProto.Type.container,
+          containerId,
+          NotifyObjectCreationStageRequestProto.Stage.complete);
       return pipeline;
     } finally {
       if (client != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java
new file mode 100644
index 0000000..73c8814
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/BlockContainerInfo.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.scm.container.common.helpers;
+
+/**
+ * Class wraps container + allocated info for containers managed by block svc.
+ */
+public class BlockContainerInfo extends ContainerInfo{
+  private long allocated;
+
+  public BlockContainerInfo(ContainerInfo container, long used) {
+    super(container);
+    this.allocated = used;
+  }
+
+  public long addAllocated(long size) {
+    allocated += size;
+    return allocated;
+  }
+
+  public long subtractAllocated(long size) {
+    allocated -= size;
+    return allocated;
+  }
+
+  public long getAllocated() {
+    return this.allocated;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
new file mode 100644
index 0000000..16bc0db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.scm.container.common.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Class wraps ozone container info.
+ */
+public class ContainerInfo {
+  private OzoneProtos.LifeCycleState state;
+  private Pipeline pipeline;
+  // The wall-clock ms since the epoch at which the current state enters.
+  private long stateEnterTime;
+
+  ContainerInfo(OzoneProtos.LifeCycleState state, Pipeline pipeline,
+      long stateEnterTime) {
+    this.pipeline = pipeline;
+    this.state = state;
+    this.stateEnterTime = stateEnterTime;
+  }
+
+  public ContainerInfo(ContainerInfo container) {
+    this.pipeline = container.getPipeline();
+    this.state = container.getState();
+    this.stateEnterTime = container.getStateEnterTime();
+  }
+
+  /**
+   * Update the current container state and state enter time to now.
+   * @param state
+   */
+  public void setState(OzoneProtos.LifeCycleState state) {
+    this.state = state;
+    this.stateEnterTime = Time.monotonicNow();
+  }
+
+  public OzoneProtos.LifeCycleState getState() {
+    return state;
+  }
+
+  public long getStateEnterTime() {
+    return stateEnterTime;
+  }
+
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
+  public OzoneProtos.SCMContainerInfo getProtobuf() {
+    OzoneProtos.SCMContainerInfo.Builder builder =
+        OzoneProtos.SCMContainerInfo.newBuilder();
+    builder.setPipeline(getPipeline().getProtobufMessage());
+    builder.setState(state);
+    builder.setStateEnterTime(stateEnterTime);
+    return builder.build();
+  }
+
+  public static ContainerInfo fromProtobuf(
+      OzoneProtos.SCMContainerInfo info) {
+    ContainerInfo.Builder builder = new ContainerInfo.Builder();
+    builder.setPipeline(Pipeline.getFromProtoBuf(info.getPipeline()));
+    builder.setState(info.getState());
+    builder.setStateEnterTime(info.getStateEnterTime());
+    return builder.build();
+  }
+
+  public static class Builder {
+    private OzoneProtos.LifeCycleState state;
+    private Pipeline pipeline;
+    private long stateEnterTime;
+
+    public Builder setState(OzoneProtos.LifeCycleState state) {
+      this.state = state;
+      return this;
+    }
+
+    public Builder setPipeline(Pipeline pipeline) {
+      this.pipeline = pipeline;
+      return this;
+    }
+
+    public Builder setStateEnterTime(long stateEnterTime) {
+      this.stateEnterTime = stateEnterTime;
+      return this;
+    }
+
+    public ContainerInfo build() {
+      return new ContainerInfo(state, pipeline, stateEnterTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
index ea0893e..94134d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/StorageContainerLocationProtocol.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
 
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 
@@ -35,8 +36,8 @@ public interface StorageContainerLocationProtocol {
    *
    */
   Pipeline allocateContainer(OzoneProtos.ReplicationType replicationType,
-      OzoneProtos.ReplicationFactor factor,
-      String containerName) throws IOException;
+      OzoneProtos.ReplicationFactor factor, String containerName)
+      throws IOException;
 
   /**
    * Ask SCM the location of the container. SCM responds with a group of
@@ -86,6 +87,18 @@ public interface StorageContainerLocationProtocol {
       OzoneProtos.QueryScope queryScope, String poolName) throws IOException;
 
   /**
+   * Notify from client when begin or finish creating objects like pipeline
+   * or containers on datanodes.
+   * Container will be in Operational state after that.
+   * @param type object type
+   * @param name object name
+   * @param stage creation stage
+   */
+  void notifyObjectCreationStage(
+      NotifyObjectCreationStageRequestProto.Type type, String name,
+      NotifyObjectCreationStageRequestProto.Stage stage) throws IOException;
+
+  /**
    * Creates a replication pipeline of a specified type.
    * @param type - replication type
    * @param factor - factor 1 or 3
@@ -95,5 +108,4 @@ public interface StorageContainerLocationProtocol {
   Pipeline createReplicationPipeline(OzoneProtos.ReplicationType type,
       OzoneProtos.ReplicationFactor factor, OzoneProtos.NodePool nodePool)
       throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 93cd0cf..8dc1c6c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -35,9 +35,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
-
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.Closeable;
@@ -208,6 +208,32 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
   }
 
   /**
+   * Notify from client that creates object on datanodes.
+   * @param type object type
+   * @param name object name
+   * @param stage object creation stage : begin/complete
+   */
+  @Override
+  public void notifyObjectCreationStage(
+      NotifyObjectCreationStageRequestProto.Type type,
+      String name,
+      NotifyObjectCreationStageRequestProto.Stage stage) throws IOException {
+    Preconditions.checkState(!Strings.isNullOrEmpty(name),
+        "Object name cannot be null or empty");
+    NotifyObjectCreationStageRequestProto request =
+        NotifyObjectCreationStageRequestProto.newBuilder()
+            .setType(type)
+            .setName(name)
+            .setStage(stage)
+            .build();
+    try {
+      rpcProxy.notifyObjectCreationStage(NULL_RPC_CONTROLLER, request);
+    } catch(ServiceException e){
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
+
+  /**
    * Creates a replication pipeline of a specified type.
    *
    * @param replicationType - replication type

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
index 50926c2..36c3736 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
@@ -35,7 +35,7 @@ message Pipeline {
     required string leaderID = 1;
     repeated DatanodeIDProto members = 2;
     required string containerName = 3;
-    optional LifeCycleStates state = 4 [default = OPERATIONAL];
+    optional LifeCycleState state = 4 [default = OPEN];
 }
 
 message KeyValue {
@@ -72,6 +72,45 @@ message NodePool {
     repeated Node nodes = 1;
 }
 
+/**
+ * LifeCycleState for SCM object creation state machine:
+ *    ->Allocated: allocated on SCM but clean has not started creating it yet.
+ *    ->Creating: allocated and assigned to client to create but not ack-ed yet.
+ *    ->Open: allocated on SCM and created on datanodes and ack-ed by a client.
+ *    ->Close: container closed due to space all used or error?
+ *    ->Timeout -> container failed to create on datanodes or ack-ed by client.
+ *    ->Deleting(TBD) -> container will be deleted after timeout
+ * 1. ALLOCATE-ed containers on SCM can't serve key/block related operation
+ *    until ACK-ed explicitly which changes the state to OPEN.
+ * 2. Only OPEN/CLOSED containers can serve key/block related operation.
+ * 3. ALLOCATE-ed containers that are not ACK-ed timely will be TIMEOUT and
+ *    CLEANUP asynchronously.
+ */
+
+enum LifeCycleState {
+    ALLOCATED = 1;
+    CREATING = 2; // Used for container allocated/created by different client.
+    OPEN =3; // Mostly an update to SCM via HB or client call.
+    CLOSED = 4; // !!State after this has not been used yet.
+    DELETING = 5;
+    DELETED = 6; // object is deleted.
+}
+
+enum LifeCycleEvent {
+    BEGIN_CREATE = 1; // A request to client to create this object
+    COMPLETE_CREATE = 2;
+    CLOSE = 3; // !!Event after this has not been used yet.
+    UPDATE = 4;
+    TIMEOUT = 5; // creation has timed out from SCM's View.
+    DELETE = 6;
+    CLEANUP = 7;
+}
+
+message SCMContainerInfo {
+    required LifeCycleState state = 1;
+    required Pipeline pipeline = 2;
+    optional int64 stateEnterTime = 3;
+}
 
 enum ReplicationType {
     RATIS = 1;
@@ -79,15 +118,7 @@ enum ReplicationType {
     CHAINED = 3;
 }
 
-
 enum ReplicationFactor {
     ONE = 1;
     THREE = 3;
-}
-
-enum LifeCycleStates {
-    CLIENT_CREATE = 1; // A request to client to create this object
-    OPERATIONAL = 2; // Mostly an update to SCM via HB or client call.
-    TIMED_OUT = 3; // creation has timed out from SCM's View.
-    DELETED = 4; // object is deleted.
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
index 30c7166..550f6a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/StorageContainerLocationProtocol.proto
@@ -82,6 +82,24 @@ message DeleteContainerResponseProto {
   // Empty response
 }
 
+message NotifyObjectCreationStageRequestProto {
+  enum Type {
+    container = 1;
+    pipeline = 2;
+  }
+  enum Stage {
+    begin = 1;
+    complete = 2;
+  }
+  required string name = 1;
+  required Type type = 2;
+  required Stage stage = 3;
+}
+
+message NotifyObjectCreationStageResponseProto {
+  // Empty response
+}
+
 /*
  NodeQueryRequest sends a request to SCM asking to send a list of nodes that
  match the NodeState that we are requesting.
@@ -160,7 +178,12 @@ service StorageContainerLocationProtocolService {
   */
   rpc queryNode(NodeQueryRequestProto)  returns (NodeQueryResponseProto);
 
- /*
+  /**
+  * Notify from client when begin or finish creating container or pipeline on datanodes.
+  */
+  rpc notifyObjectCreationStage(NotifyObjectCreationStageRequestProto) returns (NotifyObjectCreationStageResponseProto);
+
+  /*
   *  Apis that Manage Pipelines.
   *
   * Pipelines are abstractions offered by SCM and Datanode that allows users
@@ -175,5 +198,4 @@ service StorageContainerLocationProtocolService {
   */
   rpc allocatePipeline(PipelineRequestProto)
       returns (PipelineResponseProto);
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 628de42..f12aafb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerLocationProtocolProtos;
 import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
 
-import static org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
@@ -39,9 +39,10 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.DeleteContainerResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.ListContainerRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
-
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
 
@@ -160,6 +161,19 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
   }
 
   @Override
+  public NotifyObjectCreationStageResponseProto notifyObjectCreationStage(
+      RpcController controller, NotifyObjectCreationStageRequestProto request)
+      throws ServiceException {
+    try {
+      impl.notifyObjectCreationStage(request.getType(), request.getName(),
+          request.getStage());
+      return NotifyObjectCreationStageResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
   public PipelineResponseProto allocatePipeline(
       RpcController controller, PipelineRequestProto request)
       throws ServiceException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index e320983..fa14ad0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
 import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
 import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolServerSideTranslatorPB;
@@ -65,6 +66,7 @@ import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.node.SCMNodeManager;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
@@ -373,7 +375,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
   @Override
   public Pipeline getContainer(String containerName) throws IOException {
     checkAdminAccess();
-    return scmContainerManager.getContainer(containerName);
+    return scmContainerManager.getContainer(containerName).getPipeline();
   }
 
   /**
@@ -425,6 +427,34 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
   }
 
   /**
+   * Notify from client when begin/finish creating container/pipeline objects
+   * on datanodes.
+   * @param type
+   * @param name
+   * @param stage
+   */
+  @Override
+  public void notifyObjectCreationStage(
+      NotifyObjectCreationStageRequestProto.Type type, String name,
+      NotifyObjectCreationStageRequestProto.Stage stage) throws IOException {
+
+    if (type == NotifyObjectCreationStageRequestProto.Type.container) {
+      ContainerInfo info = scmContainerManager.getContainer(name);
+      LOG.info("Container {} current state {} new stage {}", name,
+          info.getState(), stage);
+      if (stage == NotifyObjectCreationStageRequestProto.Stage.begin) {
+        scmContainerManager.updateContainerState(name,
+            OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+      } else {
+        scmContainerManager.updateContainerState(name,
+            OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
+      }
+    } else if (type == NotifyObjectCreationStageRequestProto.Type.pipeline) {
+      // TODO: pipeline state update will be addressed in future patch.
+    }
+  }
+
+  /**
    * Creates a replication pipeline of a specified type.
    */
   @Override
@@ -503,7 +533,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
    *
    * @param containerName - Name of the container.
    * @param replicationFactor - replication factor.
-   * @return Pipeline.
+   * @return pipeline
    * @throws IOException
    */
   @Override
@@ -512,7 +542,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
       throws IOException {
     checkAdminAccess();
     return scmContainerManager.allocateContainer(replicationType,
-        replicationFactor, containerName);
+        replicationFactor, containerName).getPipeline();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index e000ccc..5730589 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.scm.block;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -29,6 +30,8 @@ import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.utils.BatchOperation;
@@ -83,8 +86,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   private final long containerSize;
   private final long cacheSize;
 
-  private final MetadataStore openContainerStore;
-  private Map<String, Long> openContainers;
+  // Track all containers owned by block service.
+  private final MetadataStore containerStore;
+
+  private Map<OzoneProtos.LifeCycleState,
+      Map<String, BlockContainerInfo>> containers;
   private final int containerProvisionBatchSize;
   private final Random rand;
   private final ObjectName mxBean;
@@ -121,14 +127,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
     // Load store of all open contains for block allocation
     File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB);
-    openContainerStore = MetadataStoreBuilder.newBuilder()
+    containerStore = MetadataStoreBuilder.newBuilder()
         .setConf(conf)
         .setDbFile(openContainsDbPath)
         .setCacheSize(this.cacheSize * OzoneConsts.MB)
         .build();
 
-    openContainers = new ConcurrentHashMap<>();
-    loadOpenContainers();
+    loadAllocatedContainers();
 
     this.containerProvisionBatchSize = conf.getInt(
         ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
@@ -141,20 +146,39 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
   // TODO: close full (or almost full) containers with a separate thread.
   /**
-   * Load open containers from persistent store.
+   * Load allocated containers from persistent store.
    * @throws IOException
    */
-  private void loadOpenContainers() throws IOException {
+  private void loadAllocatedContainers() throws IOException {
+    // Pre-allocate empty map entry by state to avoid null check
+    containers = new ConcurrentHashMap<>();
+    for (OzoneProtos.LifeCycleState state :
+        OzoneProtos.LifeCycleState.values()) {
+      containers.put(state, new ConcurrentHashMap());
+    }
     try {
-      openContainerStore.iterate(null, (key, value) -> {
+      containerStore.iterate(null, (key, value) -> {
         try {
           String containerName = DFSUtil.bytes2String(key);
           Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
-          openContainers.put(containerName, containerUsed);
-          LOG.debug("Loading open container: {} used : {}", containerName,
-              containerUsed);
+          ContainerInfo containerInfo =
+              containerManager.getContainer(containerName);
+          // TODO: remove the container from block manager's container DB
+          // Most likely the allocated container is timeout and cleaned up
+          // by SCM, we should clean up correspondingly instead of just skip it.
+          if (containerInfo == null) {
+            LOG.warn("Container {} allocated by block service" +
+                "can't be found in SCM", containerName);
+            return true;
+          }
+          Map<String, BlockContainerInfo> containersByState =
+              containers.get(containerInfo.getState());
+          containersByState.put(containerName,
+              new BlockContainerInfo(containerInfo, containerUsed));
+          LOG.debug("Loading allocated container: {} used : {} state: {}",
+              containerName, containerUsed, containerInfo.getState());
         } catch (Exception e) {
-          LOG.warn("Failed loading open container, continue next...");
+          LOG.warn("Failed loading allocated container, continue next...");
         }
         return true;
       });
@@ -166,25 +190,26 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   }
 
   /**
-   * Pre-provision specified count of containers for block creation.
-   * @param count - number of containers to create.
-   * @return list of container names created.
+   * Pre allocate specified count of containers for block creation.
+   * @param count - number of containers to allocate.
+   * @return list of container names allocated.
    * @throws IOException
    */
-  private List<String> provisionContainers(int count) throws IOException {
+  private List<String> allocateContainers(int count) throws IOException {
     List<String> results = new ArrayList();
     lock.lock();
     try {
       for (int i = 0; i < count; i++) {
         String containerName = UUID.randomUUID().toString();
+        ContainerInfo containerInfo = null;
         try {
           // TODO: Fix this later when Ratis is made the Default.
-          Pipeline pipeline = containerManager.allocateContainer(
+          containerInfo = containerManager.allocateContainer(
               OzoneProtos.ReplicationType.STAND_ALONE,
               OzoneProtos.ReplicationFactor.ONE,
               containerName);
 
-          if (pipeline == null) {
+          if (containerInfo == null) {
             LOG.warn("Unable to allocate container.");
             continue;
           }
@@ -192,8 +217,12 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           LOG.warn("Unable to allocate container: " + ex);
           continue;
         }
-        openContainers.put(containerName, 0L);
-        openContainerStore.put(DFSUtil.string2Bytes(containerName),
+        Map<String, BlockContainerInfo> containersByState =
+            containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
+        Preconditions.checkNotNull(containersByState);
+        containersByState.put(containerName,
+            new BlockContainerInfo(containerInfo, 0));
+        containerStore.put(DFSUtil.string2Bytes(containerName),
             DFSUtil.string2Bytes(Long.toString(0L)));
         results.add(containerName);
       }
@@ -204,6 +233,76 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   }
 
   /**
+   * Filter container by states and size.
+   * @param state the state of the container.
+   * @param size the minimal available size of the container
+   * @return allocated containers satisfy both state and size.
+   */
+  private List <String> filterContainers(OzoneProtos.LifeCycleState state,
+      long size) {
+    Map<String, BlockContainerInfo> containersByState =
+        this.containers.get(state);
+    return containersByState.entrySet().parallelStream()
+        .filter(e -> ((e.getValue().getAllocated() + size < containerSize)))
+        .map(e -> e.getKey())
+        .collect(Collectors.toList());
+  }
+
+  private BlockContainerInfo getContainer(OzoneProtos.LifeCycleState state,
+      String name) {
+    Map<String, BlockContainerInfo> containersByState = this.containers.get(state);
+    return containersByState.get(name);
+  }
+
+  // Relies on the caller such as allocateBlock() to hold the lock
+  // to ensure containers map consistent.
+  private void updateContainer(OzoneProtos.LifeCycleState oldState, String name,
+      OzoneProtos.LifeCycleState newState) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Update container {} from state {} to state {}",
+          name, oldState, newState);
+    }
+    Map<String, BlockContainerInfo> containersInOldState =
+        this.containers.get(oldState);
+    BlockContainerInfo containerInfo = containersInOldState.get(name);
+    Preconditions.checkNotNull(containerInfo);
+    containersInOldState.remove(name);
+    Map<String, BlockContainerInfo> containersInNewState =
+        this.containers.get(newState);
+    containersInNewState.put(name, containerInfo);
+  }
+
+  // Refresh containers that have been allocated.
+  // We may not need to track all the states, just the creating/open/close
+  // should be enough for now.
+  private void refreshContainers() {
+    Map<String, BlockContainerInfo> containersByState =
+        this.containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
+    for (String containerName: containersByState.keySet()) {
+      try {
+        ContainerInfo containerInfo =
+            containerManager.getContainer(containerName);
+        if (containerInfo == null) {
+          // TODO: clean up containers that has been deleted on SCM but
+          // TODO: still in ALLOCATED state in block manager.
+          LOG.debug("Container {} allocated by block service" +
+              "can't be found in SCM", containerName);
+          continue;
+        }
+        if (containerInfo.getState() == OzoneProtos.LifeCycleState.OPEN) {
+          updateContainer(OzoneProtos.LifeCycleState.ALLOCATED, containerName,
+              containerInfo.getState());
+        }
+        // TODO: check containers in other state and refresh as needed.
+        // TODO: ALLOCATED container that is timeout and DELETED. (unit test)
+        // TODO: OPEN container that is CLOSE.
+      } catch (IOException ex) {
+        LOG.debug("Failed to get container info for: {}", containerName);
+      }
+    }
+   }
+
+  /**
    * Allocates a new block for a given size.
    *
    * SCM choose one of the open containers and returns that as the location for
@@ -215,8 +314,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
    */
   @Override
   public AllocatedBlock allocateBlock(final long size) throws IOException {
-    boolean createContainer;
-    Pipeline pipeline;
+    boolean createContainer = false;
     if (size < 0 || size > containerSize) {
       throw new SCMException("Unsupported block size",
           INVALID_BLOCK_SIZE);
@@ -228,37 +326,29 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
     lock.lock();
     try {
+      refreshContainers();
       List<String> candidates;
-      if (openContainers.size() == 0) {
-        try {
-          candidates = provisionContainers(containerProvisionBatchSize);
-        } catch (IOException ex) {
-          throw new SCMException("Unable to allocate container for the block",
-              FAILED_TO_ALLOCATE_CONTAINER);
-        }
-          createContainer = true;
-      } else {
-        candidates = openContainers.entrySet().parallelStream()
-            .filter(e -> (e.getValue() + size < containerSize))
-            .map(e -> e.getKey())
-            .collect(Collectors.toList());
-        createContainer = false;
-      }
-
+      candidates = filterContainers(OzoneProtos.LifeCycleState.OPEN, size);
       if (candidates.size() == 0) {
-        try {
-          candidates = provisionContainers(containerProvisionBatchSize);
-        } catch (IOException ex) {
-          throw new SCMException("Unable to allocate container for the block",
-              FAILED_TO_ALLOCATE_CONTAINER);
+        candidates = filterContainers(OzoneProtos.LifeCycleState.ALLOCATED,
+            size);
+        if (candidates.size() == 0) {
+          try {
+            candidates = allocateContainers(containerProvisionBatchSize);
+          } catch (IOException ex) {
+            LOG.error("Unable to allocate container for the block.");
+            throw new SCMException("Unable to allocate container for the block",
+                FAILED_TO_ALLOCATE_CONTAINER);
+          }
+        }
+        // now we should have some candidates in ALLOCATE state
+        if (candidates.size() == 0) {
+          throw new SCMException("Fail to find any container to allocate block " +
+              "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
         }
       }
 
-      if (candidates.size() == 0) {
-        throw new SCMException("Fail to find any container to allocate block " +
-            "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
-      }
-
+      // Candidates list now should include only ALLOCATE or OPEN containers
       int randomIdx = rand.nextInt(candidates.size());
       String containerName = candidates.get(randomIdx);
       if (LOG.isDebugEnabled()) {
@@ -266,28 +356,46 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
             candidates.toString(), containerName);
       }
 
-      pipeline = containerManager.getContainer(containerName);
-      if (pipeline == null) {
+      ContainerInfo containerInfo =
+          containerManager.getContainer(containerName);
+      if (containerInfo == null) {
         LOG.debug("Unable to find container for the block");
         throw new SCMException("Unable to find container to allocate block",
             FAILED_TO_FIND_CONTAINER);
       }
 
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Candidate {} state {}", containerName,
+            containerInfo.getState());
+      }
+      // Container must be either OPEN or ALLOCATE state
+      if (containerInfo.getState() == OzoneProtos.LifeCycleState.ALLOCATED) {
+        createContainer = true;
+      }
+
       // TODO: make block key easier to debug (e.g., seq no)
       // Allocate key for the block
       String blockKey = UUID.randomUUID().toString();
       AllocatedBlock.Builder abb = new AllocatedBlock.Builder()
-          .setKey(blockKey).setPipeline(pipeline)
+          .setKey(blockKey).setPipeline(containerInfo.getPipeline())
           .setShouldCreateContainer(createContainer);
-      if (pipeline.getMachines().size() > 0) {
+      if (containerInfo.getPipeline().getMachines().size() > 0) {
         blockStore.put(DFSUtil.string2Bytes(blockKey),
             DFSUtil.string2Bytes(containerName));
 
         // update the container usage information
-        Long newUsed = openContainers.get(containerName) + size;
-        openContainers.put(containerName, newUsed);
-        openContainerStore.put(DFSUtil.string2Bytes(containerName),
-            DFSUtil.string2Bytes(Long.toString(newUsed)));
+        BlockContainerInfo containerInfoUpdate =
+            getContainer(containerInfo.getState(), containerName);
+        Preconditions.checkNotNull(containerInfoUpdate);
+        containerInfoUpdate.addAllocated(size);
+        containerStore.put(DFSUtil.string2Bytes(containerName),
+            DFSUtil.string2Bytes(Long.toString(containerInfoUpdate.getAllocated())));
+        if (createContainer) {
+          OzoneProtos.LifeCycleState newState =
+              containerManager.updateContainerState(containerName,
+              OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+          updateContainer(containerInfo.getState(), containerName, newState);
+        }
         return abb.build();
       }
     } finally {
@@ -312,8 +420,16 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
         throw new SCMException("Specified block key does not exist. key : " +
             key, FAILED_TO_FIND_BLOCK);
       }
-      return containerManager.getContainer(
-          DFSUtil.bytes2String(containerBytes));
+      String containerName = DFSUtil.bytes2String(containerBytes);
+      ContainerInfo containerInfo = containerManager.getContainer(
+          containerName);
+      if (containerInfo == null) {
+          LOG.debug("Container {} allocated by block service" +
+              "can't be found in SCM", containerName);
+          throw new SCMException("Unable to find container for the block",
+              SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+      return containerInfo.getPipeline();
     } finally {
       lock.unlock();
     }
@@ -338,8 +454,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
         throw new SCMException("Specified block key does not exist. key : " +
             key, FAILED_TO_FIND_BLOCK);
       }
+      // TODO: track the block size info so that we can reclaim the container
+      // TODO: used space when the block is deleted.
       BatchOperation batch = new BatchOperation();
-      containerManager.getContainer(DFSUtil.bytes2String(containerBytes));
       String deletedKeyName = getDeletedKeyName(key);
       // Add a tombstone for the deleted key
       batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
@@ -370,8 +487,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     if (blockStore != null) {
       blockStore.close();
     }
-    if (openContainerStore != null) {
-      openContainerStore.close();
+    if (containerStore != null) {
+      containerStore.close();
     }
 
     MBeans.unregister(mxBean);
@@ -379,6 +496,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
   @Override
   public int getOpenContainersNo() {
-    return openContainers.size();
+    return containers.get(OzoneProtos.LifeCycleState.OPEN).size();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index 9e4053e..d674b64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.scm.cli;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -33,8 +34,10 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.Buck
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.VolumeList;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Pipeline;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.hadoop.utils.MetadataStore;
@@ -482,10 +485,14 @@ public class SQLCLI  extends Configured implements Tool {
       HashSet<String> uuidChecked = new HashSet<>();
       dbStore.iterate(null, (key, value) -> {
         String containerName = new String(key, encoding);
-        Pipeline pipeline = null;
-        pipeline = Pipeline.parseFrom(value);
+        ContainerInfo containerInfo = null;
+        containerInfo = ContainerInfo.fromProtobuf(
+            OzoneProtos.SCMContainerInfo.PARSER.parseFrom(value));
+        Preconditions.checkNotNull(containerInfo);
         try {
-          insertContainerDB(conn, containerName, pipeline, uuidChecked);
+          //TODO: include container state to sqllite schema
+          insertContainerDB(conn, containerName,
+              containerInfo.getPipeline().getProtobufMessage(), uuidChecked);
           return true;
         } catch (SQLException e) {
           throw new IOException(e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index 8daa5d4..3cf78d9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -20,13 +20,17 @@ package org.apache.hadoop.ozone.scm.container;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
 import org.apache.hadoop.utils.MetadataStore;
@@ -38,8 +42,10 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -60,6 +66,9 @@ public class ContainerMapping implements Mapping {
   private final MetadataStore containerStore;
   private final PipelineSelector pipelineSelector;
 
+  private final StateMachine<OzoneProtos.LifeCycleState,
+        OzoneProtos.LifeCycleEvent> stateMachine;
+
   /**
    * Constructs a mapping class that creates mapping between container names and
    * pipelines.
@@ -88,31 +97,80 @@ public class ContainerMapping implements Mapping {
         .build();
 
     this.lock = new ReentrantLock();
+
     this.pipelineSelector = new PipelineSelector(nodeManager, conf);
+
+    // Initialize the container state machine.
+    Set<OzoneProtos.LifeCycleState> finalStates = new HashSet();
+    finalStates.add(OzoneProtos.LifeCycleState.OPEN);
+    finalStates.add(OzoneProtos.LifeCycleState.CLOSED);
+    finalStates.add(OzoneProtos.LifeCycleState.DELETED);
+
+    this.stateMachine = new StateMachine<>(
+        OzoneProtos.LifeCycleState.ALLOCATED, finalStates);
+    initializeStateMachine();
   }
 
+  // Client-driven Create State Machine
+  // States: <ALLOCATED>------------->CREATING----------------->[OPEN]
+  // Events:            (BEGIN_CREATE)    |    (COMPLETE_CREATE)
+  //                                      |
+  //                                      |(TIMEOUT)
+  //                                      V
+  //                                  DELETING----------------->[DELETED]
+  //                                           (CLEANUP)
+
+  // SCM Open/Close State Machine
+  // States: OPEN------------------>[CLOSED]
+  // Events:        (CLOSE)
+
+  // Delete State Machine
+  // States: OPEN------------------>DELETING------------------>[DELETED]
+  // Events:         (DELETE)                  (CLEANUP)
+  private void initializeStateMachine() {
+    stateMachine.addTransition(OzoneProtos.LifeCycleState.ALLOCATED,
+        OzoneProtos.LifeCycleState.CREATING,
+        OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
+
+    stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING,
+        OzoneProtos.LifeCycleState.OPEN,
+        OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
+
+    stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN,
+        OzoneProtos.LifeCycleState.CLOSED,
+        OzoneProtos.LifeCycleEvent.CLOSE);
 
+    stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN,
+        OzoneProtos.LifeCycleState.DELETING,
+        OzoneProtos.LifeCycleEvent.DELETE);
+
+    stateMachine.addTransition(OzoneProtos.LifeCycleState.DELETING,
+        OzoneProtos.LifeCycleState.DELETED,
+        OzoneProtos.LifeCycleEvent.CLEANUP);
+
+    // Creating timeout -> Deleting
+    stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING,
+        OzoneProtos.LifeCycleState.DELETING,
+        OzoneProtos.LifeCycleEvent.TIMEOUT);
+  }
 
   /**
-   * Returns the Pipeline from the container name.
-   *
-   * @param containerName - Name
-   * @return - Pipeline that makes up this container.
+   * {@inheritDoc}
    */
   @Override
-  public Pipeline getContainer(final String containerName) throws IOException {
-    Pipeline pipeline;
+  public ContainerInfo getContainer(final String containerName) throws IOException {
+    ContainerInfo containerInfo;
     lock.lock();
     try {
-      byte[] pipelineBytes =
+      byte[] containerBytes =
           containerStore.get(containerName.getBytes(encoding));
-      if (pipelineBytes == null) {
+      if (containerBytes == null) {
         throw new SCMException("Specified key does not exist. key : " +
             containerName, SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
       }
-      pipeline = Pipeline.getFromProtoBuf(
-          OzoneProtos.Pipeline.PARSER.parseFrom(pipelineBytes));
-      return pipeline;
+      containerInfo = ContainerInfo.fromProtobuf(
+          OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes));
+      return containerInfo;
     } finally {
       lock.unlock();
     }
@@ -138,10 +196,13 @@ public class ContainerMapping implements Mapping {
           containerStore.getRangeKVs(startKey, count, prefixFilter);
 
       // Transform the values into the pipelines.
+      // TODO: return list of ContainerInfo instead of pipelines.
+      // TODO: filter by container state
       for (Map.Entry<byte[], byte[]> entry : range) {
-        Pipeline pipeline = Pipeline.getFromProtoBuf(
-            OzoneProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
-        pipelineList.add(pipeline);
+        ContainerInfo containerInfo =  ContainerInfo.fromProtobuf(
+            OzoneProtos.SCMContainerInfo.PARSER.parseFrom(entry.getValue()));
+        Preconditions.checkNotNull(containerInfo);
+        pipelineList.add(containerInfo.getPipeline());
       }
     } finally {
       lock.unlock();
@@ -158,12 +219,12 @@ public class ContainerMapping implements Mapping {
    * @throws IOException - Exception
    */
   @Override
-  public Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+  public ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
       OzoneProtos.ReplicationFactor replicationFactor,
       final String containerName) throws IOException {
     Preconditions.checkNotNull(containerName);
     Preconditions.checkState(!containerName.isEmpty());
-    Pipeline pipeline = null;
+    ContainerInfo containerInfo = null;
     if (!nodeManager.isOutOfNodeChillMode()) {
       throw new SCMException("Unable to create container while in chill mode",
           SCMException.ResultCodes.CHILL_MODE_EXCEPTION);
@@ -171,20 +232,25 @@ public class ContainerMapping implements Mapping {
 
     lock.lock();
     try {
-      byte[] pipelineBytes =
+      byte[] containerBytes =
           containerStore.get(containerName.getBytes(encoding));
-      if (pipelineBytes != null) {
+      if (containerBytes != null) {
         throw new SCMException("Specified container already exists. key : " +
             containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
       }
-      pipeline = pipelineSelector.getReplicationPipeline(type,
+      Pipeline pipeline = pipelineSelector.getReplicationPipeline(type,
           replicationFactor, containerName);
+      containerInfo = new ContainerInfo.Builder()
+          .setState(OzoneProtos.LifeCycleState.ALLOCATED)
+          .setPipeline(pipeline)
+          .setStateEnterTime(Time.monotonicNow())
+          .build();
       containerStore.put(containerName.getBytes(encoding),
-          pipeline.getProtobufMessage().toByteArray());
+          containerInfo.getProtobuf().toByteArray());
     } finally {
       lock.unlock();
     }
-    return pipeline;
+    return containerInfo;
   }
 
   /**
@@ -199,9 +265,9 @@ public class ContainerMapping implements Mapping {
     lock.lock();
     try {
       byte[] dbKey = containerName.getBytes(encoding);
-      byte[] pipelineBytes =
+      byte[] containerBytes =
           containerStore.get(dbKey);
-      if (pipelineBytes == null) {
+      if(containerBytes == null) {
         throw new SCMException("Failed to delete container "
             + containerName + ", reason : container doesn't exist.",
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
@@ -213,6 +279,44 @@ public class ContainerMapping implements Mapping {
   }
 
   /**
+   * {@inheritDoc}
+   * Used by client to update container state on SCM.
+   */
+  @Override
+  public OzoneProtos.LifeCycleState updateContainerState(String containerName,
+      OzoneProtos.LifeCycleEvent event) throws IOException {
+    ContainerInfo containerInfo;
+    lock.lock();
+    try {
+      byte[] dbKey = containerName.getBytes(encoding);
+      byte[] containerBytes =
+          containerStore.get(dbKey);
+      if(containerBytes == null) {
+        throw new SCMException("Failed to update container state"
+            + containerName + ", reason : container doesn't exist.",
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+      containerInfo = ContainerInfo.fromProtobuf(
+          OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes));
+
+      OzoneProtos.LifeCycleState newState;
+      try {
+         newState = stateMachine.getNextState(containerInfo.getState(), event);
+      } catch (InvalidStateTransitionException ex) {
+        throw new SCMException("Failed to update container state"
+            + containerName + ", reason : invalid state transition from state: "
+            + containerInfo.getState() + " upon event: " + event + ".",
+            SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE);
+      }
+      containerInfo.setState(newState);
+      containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
+      return newState;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
    * Closes this stream and releases any system resources associated with it. If
    * the stream is already closed then invoking this method has no effect.
    * <p>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
index 1ef3572..7cf3f96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.scm.container;
 
 
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.Closeable;
@@ -30,13 +31,13 @@ import java.util.List;
  */
 public interface Mapping extends Closeable {
   /**
-   * Returns the Pipeline from the container name.
+   * Returns the ContainerInfo from the container name.
    *
    * @param containerName - Name
-   * @return - Pipeline that makes up this container.
+   * @return - ContainerInfo such as creation state and the pipeline.
    * @throws IOException
    */
-  Pipeline getContainer(String containerName) throws IOException;
+  ContainerInfo getContainer(String containerName) throws IOException;
 
   /**
    * Returns pipelines under certain conditions.
@@ -57,16 +58,15 @@ public interface Mapping extends Closeable {
   List<Pipeline> listContainer(String startName, String prefixName, int count)
       throws IOException;
 
-
   /**
    * Allocates a new container for a given keyName and replication factor.
    *
    * @param containerName - Name.
    * @param replicationFactor - replication factor of the container.
-   * @return - Pipeline that makes up this container.
+   * @return - Container Info.
    * @throws IOException
    */
-  Pipeline allocateContainer(OzoneProtos.ReplicationType type,
+  ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
       OzoneProtos.ReplicationFactor replicationFactor,
       String containerName) throws IOException;
 
@@ -77,4 +77,14 @@ public interface Mapping extends Closeable {
    * @throws IOException
    */
   void deleteContainer(String containerName) throws IOException;
+
+  /**
+   * Update container state.
+   * @param containerName - Container Name
+   * @param event - container life cycle event
+   * @return - new container state
+   * @throws IOException
+   */
+  OzoneProtos.LifeCycleState updateContainerState(String containerName,
+      OzoneProtos.LifeCycleEvent event) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
index f60bdc6..4e8470d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/exceptions/SCMException.java
@@ -106,6 +106,7 @@ public class SCMException extends IOException {
     CHILL_MODE_EXCEPTION,
     FAILED_TO_LOAD_OPEN_CONTAINER,
     FAILED_TO_ALLOCATE_CONTAINER,
+    FAILED_TO_CHANGE_CONTAINER_STATE,
     CONTAINER_EXISTS,
     FAILED_TO_FIND_CONTAINER,
     FAILED_TO_FIND_CONTAINER_WITH_SAPCE,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index aa52979..8400ee0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -51,7 +51,7 @@ message SCMHeartbeatRequestProto {
   optional ReportState containerReportState = 3;
 }
 
-enum ContainerState {
+enum DatanodeContainerState {
   closed = 0;
   open = 1;
 }
@@ -76,7 +76,7 @@ SCM database, This information allows SCM to startup faster and avoid having
 all container info in memory all the time.
   */
 message ContainerPersistanceProto {
-  required ContainerState state = 1;
+  required DatanodeContainerState state = 1;
   required hadoop.hdfs.ozone.Pipeline pipeline = 2;
   required ContainerInfo info = 3;
 }
@@ -89,8 +89,6 @@ message NodeContianerMapping {
   repeated string contianerName = 1;
 }
 
-
-
 /**
 A container report contains the following information.
 */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 837d1b7..501475b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -319,20 +319,25 @@ public class TestOzoneRpcClient {
       throws IOException, OzoneException {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
-    String keyName = UUID.randomUUID().toString();
+
     String value = "sample value";
     ozClient.createVolume(volumeName);
     ozClient.createBucket(volumeName, bucketName);
-    OzoneOutputStream out = ozClient.createKey(volumeName, bucketName,
-        keyName, value.getBytes().length);
-    out.write(value.getBytes());
-    out.close();
-    OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName);
-    Assert.assertEquals(keyName, key.getKeyName());
-    OzoneInputStream is = ozClient.getKey(volumeName, bucketName, keyName);
-    byte[] fileContent = new byte[value.getBytes().length];
-    is.read(fileContent);
-    Assert.assertEquals(value, new String(fileContent));
+
+    for (int i = 0; i < 10; i++) {
+      String keyName = UUID.randomUUID().toString();
+
+      OzoneOutputStream out = ozClient.createKey(volumeName, bucketName,
+          keyName, value.getBytes().length);
+      out.write(value.getBytes());
+      out.close();
+      OzoneKey key = ozClient.getKeyDetails(volumeName, bucketName, keyName);
+      Assert.assertEquals(keyName, key.getKeyName());
+      OzoneInputStream is = ozClient.getKey(volumeName, bucketName, keyName);
+      byte[] fileContent = new byte[value.getBytes().length];
+      is.read(fileContent);
+      Assert.assertEquals(value, new String(fileContent));
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
index cc7c9ff..79e6af6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/TestContainerMapping.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.AfterClass;
@@ -80,11 +81,11 @@ public class TestContainerMapping {
 
   @Test
   public void testallocateContainer() throws Exception {
-    Pipeline pipeline = mapping.allocateContainer(
+    ContainerInfo containerInfo = mapping.allocateContainer(
         xceiverClientManager.getType(),
         xceiverClientManager.getFactor(),
         UUID.randomUUID().toString());
-    Assert.assertNotNull(pipeline);
+    Assert.assertNotNull(containerInfo);
   }
 
   @Test
@@ -97,13 +98,15 @@ public class TestContainerMapping {
      */
     Set<String> pipelineList = new TreeSet<>();
     for (int x = 0; x < 30; x++) {
-      Pipeline pipeline = mapping.allocateContainer(
+      ContainerInfo containerInfo = mapping.allocateContainer(
           xceiverClientManager.getType(),
           xceiverClientManager.getFactor(),
           UUID.randomUUID().toString());
 
-      Assert.assertNotNull(pipeline);
-      pipelineList.add(pipeline.getLeader().getDatanodeUuid());
+      Assert.assertNotNull(containerInfo);
+      Assert.assertNotNull(containerInfo.getPipeline());
+      pipelineList.add(containerInfo.getPipeline().getLeader()
+          .getDatanodeUuid());
     }
     Assert.assertTrue(pipelineList.size() > 5);
   }
@@ -113,9 +116,9 @@ public class TestContainerMapping {
     String containerName = UUID.randomUUID().toString();
     Pipeline pipeline = mapping.allocateContainer(
         xceiverClientManager.getType(),
-        xceiverClientManager.getFactor(), containerName);
+        xceiverClientManager.getFactor(), containerName).getPipeline();
     Assert.assertNotNull(pipeline);
-    Pipeline newPipeline = mapping.getContainer(containerName);
+    Pipeline newPipeline = mapping.getContainer(containerName).getPipeline();
     Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
         newPipeline.getLeader().getDatanodeUuid());
   }
@@ -125,7 +128,7 @@ public class TestContainerMapping {
     String containerName = UUID.randomUUID().toString();
     Pipeline pipeline = mapping.allocateContainer(
         xceiverClientManager.getType(),
-        xceiverClientManager.getFactor(), containerName);
+        xceiverClientManager.getFactor(), containerName).getPipeline();
     Assert.assertNotNull(pipeline);
     thrown.expectMessage("Specified container already exists.");
     mapping.allocateContainer(xceiverClientManager.getType(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8333602a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
index 66e5c1e..430d34b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestContainerPlacement.java
@@ -154,7 +154,7 @@ public class TestContainerPlacement {
       String container1 = UUID.randomUUID().toString();
       Pipeline pipeline1 = containerManager.allocateContainer(
           xceiverClientManager.getType(),
-          xceiverClientManager.getFactor(), container1);
+          xceiverClientManager.getFactor(), container1).getPipeline();
       assertEquals(xceiverClientManager.getFactor().getNumber(),
           pipeline1.getMachines().size());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org