You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2018/01/30 19:03:40 UTC

[2/2] hadoop git commit: HDFS-12522. Ozone: Remove the Priority Queues used in the Container State Manager. Contributed by Anu Engineer.

HDFS-12522. Ozone: Remove the Priority Queues used in the Container State Manager. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7955ee40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7955ee40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7955ee40

Branch: refs/heads/HDFS-7240
Commit: 7955ee40457fbbe7453071e3aed6d12601fd7953
Parents: c177824
Author: Anu Engineer <ae...@apache.org>
Authored: Tue Jan 30 10:57:10 2018 -0800
Committer: Anu Engineer <ae...@apache.org>
Committed: Tue Jan 30 10:57:10 2018 -0800

----------------------------------------------------------------------
 .../container/ContainerStates/ContainerID.java  |  97 +++++
 .../container/ContainerStates/package-info.java |  22 +
 .../container/common/helpers/ContainerInfo.java |  52 ++-
 .../src/main/proto/Ozone.proto                  |   1 +
 hadoop-hdfs-project/hadoop-hdfs/pom.xml         |  12 +
 .../common/helpers/ContainerReport.java         |  13 +
 .../cli/container/ContainerCommandHandler.java  |   2 +-
 .../ozone/scm/container/ContainerMapping.java   |  75 ++--
 .../scm/container/ContainerStateManager.java    | 411 +++++++------------
 .../ContainerStates/ContainerAttribute.java     | 244 +++++++++++
 .../ContainerStates/ContainerState.java         |  96 +++++
 .../ContainerStates/ContainerStateMap.java      | 402 ++++++++++++++++++
 .../container/ContainerStates/package-info.java |  22 +
 .../StorageContainerDatanodeProtocol.proto      |   1 +
 .../ozone/container/ContainerTestHelper.java    |   4 +-
 .../ReplicationDatanodeStateManager.java        |   3 +
 .../ozone/container/common/TestEndPoint.java    |   2 +
 .../BenchmarkContainerStateMap.java             | 180 ++++++++
 .../ContainerStates/TestContainerAttribute.java | 139 +++++++
 .../ContainerStates/TestContainerStateMap.java  | 226 ++++++++++
 .../scm/container/TestContainerMapping.java     |  54 ++-
 .../container/TestContainerStateManager.java    |  56 ++-
 22 files changed, 1751 insertions(+), 363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerID.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerID.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerID.java
new file mode 100644
index 0000000..a51d3b7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerID.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.scm.container.ContainerStates;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.util.MathUtils;
+
+/**
+ * Container ID is an integer that is a value between 1..MAX_CONTAINER ID.
+ * <p>
+ * We are creating a specific type for this to avoid mixing this with
+ * normal integers in code.
+ */
+public class ContainerID implements Comparable {
+
+  private final long id;
+
+  /**
+   * Constructs ContainerID.
+   *
+   * @param id int
+   */
+  public ContainerID(long id) {
+    Preconditions.checkState(id > 0,
+        "Container ID should be a positive int");
+    this.id = id;
+  }
+
+  /**
+   * Factory method for creation of ContainerID.
+   * @param containerID  long
+   * @return ContainerID.
+   */
+  public static ContainerID valueof(long containerID) {
+    Preconditions.checkState(containerID > 0);
+    return new ContainerID(containerID);
+  }
+
+  /**
+   * Returns int representation of ID.
+   *
+   * @return int
+   */
+  public long getId() {
+    return id;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ContainerID that = (ContainerID) o;
+
+    return id == that.id;
+  }
+
+  @Override
+  public int hashCode() {
+    return MathUtils.hash(id);
+  }
+
+  @Override
+  public int compareTo(Object o) {
+    Preconditions.checkNotNull(o);
+    if (o instanceof ContainerID) {
+      return Long.compare(((ContainerID) o).getId(), this.getId());
+    }
+    throw new IllegalArgumentException("Object O, should be an instance " +
+        "of ContainerID");
+  }
+
+  @Override
+  public String toString() {
+    return "id=" + id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
new file mode 100644
index 0000000..61f5609
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+/**
+ * Container States.
+ */
+package org.apache.hadoop.ozone.scm.container.ContainerStates;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
index fc51c80..11d4438 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/container/common/helpers/ContainerInfo.java
@@ -18,14 +18,18 @@
 
 package org.apache.hadoop.scm.container.common.helpers;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerID;
 import org.apache.hadoop.util.Time;
 
 import java.util.Comparator;
 
-/** Class wraps ozone container info. */
+/**
+ * Class wraps ozone container info.
+ */
 public class ContainerInfo
     implements Comparator<ContainerInfo>, Comparable<ContainerInfo> {
   private OzoneProtos.LifeCycleState state;
@@ -40,8 +44,9 @@ public class ContainerInfo
   private long stateEnterTime;
   private String owner;
   private String containerName;
-
+  private long containerID;
   ContainerInfo(
+      long containerID,
       final String containerName,
       OzoneProtos.LifeCycleState state,
       Pipeline pipeline,
@@ -50,6 +55,7 @@ public class ContainerInfo
       long numberOfKeys,
       long stateEnterTime,
       String owner) {
+    this.containerID = containerID;
     this.containerName = containerName;
     this.pipeline = pipeline;
     this.allocatedBytes = allocatedBytes;
@@ -77,9 +83,14 @@ public class ContainerInfo
     builder.setStateEnterTime(info.getStateEnterTime());
     builder.setOwner(info.getOwner());
     builder.setContainerName(info.getContainerName());
+    builder.setContainerID(info.getContainerID());
     return builder.build();
   }
 
+  public long getContainerID() {
+    return containerID;
+  }
+
   public String getContainerName() {
     return containerName;
   }
@@ -88,6 +99,10 @@ public class ContainerInfo
     return state;
   }
 
+  public void setState(OzoneProtos.LifeCycleState state) {
+    this.state = state;
+  }
+
   public long getStateEnterTime() {
     return stateEnterTime;
   }
@@ -100,6 +115,16 @@ public class ContainerInfo
     return allocatedBytes;
   }
 
+  /**
+   * Set Allocated bytes.
+   *
+   * @param size - newly allocated bytes -- negative size is case of deletes
+   * can be used.
+   */
+  public void updateAllocatedBytes(long size) {
+    this.allocatedBytes += size;
+  }
+
   public long getUsedBytes() {
     return usedBytes;
   }
@@ -108,8 +133,13 @@ public class ContainerInfo
     return numberOfKeys;
   }
 
+  public ContainerID containerID() {
+    return new ContainerID(getContainerID());
+  }
+
   /**
    * Gets the last used time from SCM's perspective.
+   *
    * @return time in milliseconds.
    */
   public long getLastUsed() {
@@ -135,6 +165,7 @@ public class ContainerInfo
     builder.setNumberOfKeys(getNumberOfKeys());
     builder.setState(state);
     builder.setStateEnterTime(stateEnterTime);
+    builder.setContainerID(getContainerID());
 
     if (getOwner() != null) {
       builder.setOwner(getOwner());
@@ -180,7 +211,7 @@ public class ContainerInfo
         // TODO : Fix this later. If we add these factors some tests fail.
         // So Commenting this to continue and will enforce this with
         // Changes in pipeline where we remove Container Name to
-        // SCMContainerinfo from Pipline.
+        // SCMContainerinfo from Pipeline.
         // .append(pipeline.getFactor(), that.pipeline.getFactor())
         // .append(pipeline.getType(), that.pipeline.getType())
         .append(owner, that.owner)
@@ -233,7 +264,9 @@ public class ContainerInfo
     return this.compare(this, o);
   }
 
-  /** Builder class for ContainerInfo. */
+  /**
+   * Builder class for ContainerInfo.
+   */
   public static class Builder {
     private OzoneProtos.LifeCycleState state;
     private Pipeline pipeline;
@@ -243,6 +276,13 @@ public class ContainerInfo
     private long stateEnterTime;
     private String owner;
     private String containerName;
+    private long containerID;
+
+    public Builder setContainerID(long id) {
+      Preconditions.checkState(id >= 0);
+      this.containerID = id;
+      return this;
+    }
 
     public Builder setState(OzoneProtos.LifeCycleState lifeCycleState) {
       this.state = lifeCycleState;
@@ -286,8 +326,8 @@ public class ContainerInfo
 
     public ContainerInfo build() {
       return new
-          ContainerInfo(containerName, state, pipeline, allocated, used,
-          keys, stateEnterTime, owner);
+          ContainerInfo(containerID, containerName, state, pipeline,
+          allocated, used, keys, stateEnterTime, owner);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
index e5bfd36..7e0c954 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/Ozone.proto
@@ -133,6 +133,7 @@ message SCMContainerInfo {
     required uint64 numberOfKeys = 6;
     optional int64 stateEnterTime = 7;
     required string owner = 8;
+    required int64 containerID = 9;
 }
 
 message GetScmInfoRequestProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index b5e22a7..bdaa789 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -244,6 +244,18 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
         <artifactId>assertj-core</artifactId>
         <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-core</artifactId>
+      <version>1.19</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.openjdk.jmh</groupId>
+      <artifactId>jmh-generator-annprocess</artifactId>
+      <version>1.19</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
index 87a2493..81c41bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerReport.java
@@ -35,6 +35,17 @@ public class ContainerReport {
   private long writeCount;
   private long readBytes;
   private long writeBytes;
+  private long containerID;
+
+  public long getContainerID() {
+    return containerID;
+  }
+
+  public void setContainerID(long containerID) {
+    this.containerID = containerID;
+  }
+
+
 
 
   /**
@@ -87,6 +98,7 @@ public class ContainerReport {
       report.setWriteBytes(info.getWriteBytes());
     }
 
+    report.setContainerID(info.getContainerID());
     return report;
   }
 
@@ -200,6 +212,7 @@ public class ContainerReport {
         .setWriteCount(this.getWriteCount())
         .setWriteBytes(this.getWriteBytes())
         .setFinalhash(this.getFinalhash())
+        .setContainerID(this.getContainerID())
         .build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java
index 70448df..7838b1f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/container/ContainerCommandHandler.java
@@ -40,7 +40,7 @@ import static org.apache.hadoop.ozone.scm.cli.container
     .ListContainerHandler.CONTAINER_LIST;
 
 /**
- * The handler class of container-specific commands, e.g. createContainer.
+ * The handler class of container-specific commands, e.g. addContainer.
  */
 public class ContainerCommandHandler extends OzoneCommandHandler {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index c8875ba..fe86064 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements. See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership. The ASF
@@ -29,8 +29,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import org.apache.hadoop.ozone.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
@@ -55,7 +54,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_CONTAINER_STATE;
 
 /**
  * Mapping class contains the mapping from a name to a pipeline mapping. This
@@ -82,19 +82,18 @@ public class ContainerMapping implements Mapping {
    *
    * @param nodeManager - NodeManager so that we can get the nodes that are
    * healthy to place new
-   *     containers.
+   * containers.
    * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
    * its nodes. This is
-   *     passed to LevelDB and this memory is allocated in Native code space.
-   *     CacheSize is specified
-   *     in MB.
-   * @throws IOException
+   * passed to LevelDB and this memory is allocated in Native code space.
+   * CacheSize is specified
+   * in MB.
+   * @throws IOException on Failure.
    */
   @SuppressWarnings("unchecked")
   public ContainerMapping(
       final Configuration conf, final NodeManager nodeManager, final int
-      cacheSizeMB)
-      throws IOException {
+      cacheSizeMB) throws IOException {
     this.nodeManager = nodeManager;
     this.cacheSize = cacheSizeMB;
 
@@ -113,7 +112,7 @@ public class ContainerMapping implements Mapping {
 
     this.pipelineSelector = new PipelineSelector(nodeManager, conf);
     this.containerStateManager =
-        new ContainerStateManager(conf, this, this.cacheSize * OzoneConsts.MB);
+        new ContainerStateManager(conf, this);
     this.containerCloseThreshold = conf.getFloat(
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD,
         ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT);
@@ -128,7 +127,9 @@ public class ContainerMapping implements Mapping {
     containerLeaseManager.start();
   }
 
-  /** {@inheritDoc} */
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public ContainerInfo getContainer(final String containerName) throws
       IOException {
@@ -142,16 +143,19 @@ public class ContainerMapping implements Mapping {
             "Specified key does not exist. key : " + containerName,
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
       }
-      containerInfo =
-          ContainerInfo.fromProtobuf(OzoneProtos.SCMContainerInfo.PARSER
-              .parseFrom(containerBytes));
+
+      OzoneProtos.SCMContainerInfo temp = OzoneProtos.SCMContainerInfo.PARSER
+          .parseFrom(containerBytes);
+      containerInfo = ContainerInfo.fromProtobuf(temp);
       return containerInfo;
     } finally {
       lock.unlock();
     }
   }
 
-  /** {@inheritDoc} */
+  /**
+   * {@inheritDoc}
+   */
   @Override
   public List<ContainerInfo> listContainer(String startName,
       String prefixName, int count) throws IOException {
@@ -188,7 +192,7 @@ public class ContainerMapping implements Mapping {
    *
    * @param replicationFactor - replication factor of the container.
    * @param containerName - Name of the container.
-   * @param owner
+   * @param owner - The string name of the Service that owns this container.
    * @return - Pipeline that makes up this container.
    * @throws IOException - Exception
    */
@@ -201,7 +205,8 @@ public class ContainerMapping implements Mapping {
       throws IOException {
     Preconditions.checkNotNull(containerName);
     Preconditions.checkState(!containerName.isEmpty());
-    ContainerInfo containerInfo = null;
+
+    ContainerInfo containerInfo;
     if (!nodeManager.isOutOfChillMode()) {
       throw new SCMException(
           "Unable to create container while in chill mode",
@@ -219,7 +224,8 @@ public class ContainerMapping implements Mapping {
       }
       containerInfo =
           containerStateManager.allocateContainer(
-              pipelineSelector, type, replicationFactor, containerName, owner);
+              pipelineSelector, type, replicationFactor, containerName,
+              owner);
       containerStore.put(
           containerName.getBytes(encoding), containerInfo.getProtobuf()
               .toByteArray());
@@ -234,8 +240,8 @@ public class ContainerMapping implements Mapping {
    *
    * @param containerName - Container name
    * @throws IOException if container doesn't exist or container store failed
-   * to delete the
-   *     specified key.
+   *                     to delete the
+   *                     specified key.
    */
   @Override
   public void deleteContainer(String containerName) throws IOException {
@@ -255,7 +261,9 @@ public class ContainerMapping implements Mapping {
     }
   }
 
-  /** {@inheritDoc} Used by client to update container state on SCM. */
+  /**
+   * {@inheritDoc} Used by client to update container state on SCM.
+   */
   @Override
   public OzoneProtos.LifeCycleState updateContainerState(
       String containerName, OzoneProtos.LifeCycleEvent event) throws
@@ -327,8 +335,10 @@ public class ContainerMapping implements Mapping {
     }
   }
 
-  /** + * Returns the container State Manager. + * + * @return
-   * ContainerStateManager + */
+  /**
+   * Returns the container State Manager.
+   * @return ContainerStateManager
+   */
   @Override
   public ContainerStateManager getStateManager() {
     return containerStateManager;
@@ -374,6 +384,7 @@ public class ContainerMapping implements Mapping {
           builder.setNumberOfKeys(containerInfo.getKeyCount());
           builder.setState(oldInfo.getState());
           builder.setStateEnterTime(oldInfo.getStateEnterTime());
+          builder.setContainerID(oldInfo.getContainerID());
           if (oldInfo.getOwner() != null) {
             builder.setOwner(oldInfo.getOwner());
           }
@@ -393,15 +404,16 @@ public class ContainerMapping implements Mapping {
                 OzoneProtos.LifeCycleEvent.FINALIZE);
             if (state != OzoneProtos.LifeCycleState.CLOSING) {
               LOG.error("Failed to close container {}, reason : Not able to " +
-                      "update container state, current container state: {}." +
-                      "in state {}", containerInfo.getContainerName(), state);
+                  "update container state, current container state: {}.",
+                  containerInfo.getContainerName(), state);
             }
           }
         } else {
           // Container not found in our container db.
           LOG.error("Error while processing container report from datanode :" +
               " {}, for container: {}, reason: container doesn't exist in" +
-              "container database.");
+              "container database.", datanodeID,
+              containerInfo.getContainerName());
         }
       } finally {
         lock.unlock();
@@ -409,14 +421,11 @@ public class ContainerMapping implements Mapping {
     }
   }
 
-
   /**
    * Closes this stream and releases any system resources associated with it.
    * If the stream is
    * already closed then invoking this method has no effect.
    *
-   * <p>
-   *
    * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
    * fail require careful
    * attention. It is strongly advised to relinquish the underlying resources
@@ -445,7 +454,7 @@ public class ContainerMapping implements Mapping {
    * containerStateManager, when closing ContainerMapping, we need to update
    * this in the container store.
    *
-   * @throws IOException
+   * @throws IOException  on failure.
    */
   @VisibleForTesting
   public void flushContainerInfo() throws IOException {
@@ -476,7 +485,7 @@ public class ContainerMapping implements Mapping {
           containerStore.put(dbKey, newInfo.getProtobuf().toByteArray());
         } else {
           LOG.debug("Container state manager has container {} but not found " +
-              "in container store, a deleted container?",
+                  "in container store, a deleted container?",
               info.getContainerName());
         }
       } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
index 0240f69..6fb5872 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with this
  * work for additional information regarding copyright ownership.  The ASF
@@ -17,16 +17,19 @@
 
 package org.apache.hadoop.ozone.scm.container;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.common.statemachine.StateMachine;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerID;
+import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerState;
+import org.apache.hadoop.ozone.scm.container.ContainerStates.ContainerStateMap;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.scm.ScmConfigKeys;
@@ -39,26 +42,15 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.PriorityQueue;
 import java.util.List;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
-import static org.apache.hadoop.ozone.scm.exceptions
-    .SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_CONTAINER_STATE;
 
 /**
  * A container state manager keeps track of container states and returns
@@ -86,7 +78,7 @@ import static org.apache.hadoop.ozone.scm.exceptions
  * this container.
  * <p>
  * 4. Once the creation of the container is complete, the client will make
- * another call to the SCM, this time specifing the containerName and the
+ * another call to the SCM, this time specifying the containerName and the
  * COMPLETE_CREATE as the Event.
  * <p>
  * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
@@ -125,14 +117,9 @@ public class ContainerStateManager implements Closeable {
       OzoneProtos.LifeCycleEvent> stateMachine;
 
   private final long containerSize;
-  private final long cacheSize;
-  private final long blockSize;
-
-  // A map that maintains the ContainerKey to Containers of that type ordered
-  // by last access time.
-  private final ReadWriteLock lock;
-  private final Queue<ContainerInfo> containerCloseQueue;
-  private Map<ContainerKey, PriorityQueue<ContainerInfo>> containers;
+  private final ConcurrentHashMap<ContainerState, ContainerID> lastUsedMap;
+  private final ContainerStateMap containers;
+  private final AtomicLong containerCount;
 
   /**
    * Constructs a Container State Manager that tracks all containers owned by
@@ -140,9 +127,9 @@ public class ContainerStateManager implements Closeable {
    * <p>
    * TODO : Add Container Tags so we know which containers are owned by SCM.
    */
+  @SuppressWarnings("unchecked")
   public ContainerStateManager(Configuration configuration,
-      Mapping containerMapping, final long cacheSize) throws IOException {
-    this.cacheSize = cacheSize;
+      Mapping containerMapping) {
 
     // Initialize the container state machine.
     Set<OzoneProtos.LifeCycleState> finalStates = new HashSet();
@@ -160,68 +147,46 @@ public class ContainerStateManager implements Closeable {
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
 
-    this.blockSize = OzoneConsts.MB * configuration.getLong(
-        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
-        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
-
-    lock = new ReentrantReadWriteLock();
-    containers = new HashMap<>();
+    lastUsedMap = new ConcurrentHashMap<>();
+    containerCount = new AtomicLong(0);
+    containers = new ContainerStateMap();
     loadExistingContainers(containerMapping);
-    containerCloseQueue = new ConcurrentLinkedQueue<>();
   }
 
-  /**
-   * Creates containers maps of following types.
-   * <p>
-   * OZONE  of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED,
-   * CREATING, OPEN, CLOSED, DELETING, DELETED}  container states
-   * <p>
-   * CBLOCK of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED,
-   * CREATING, OPEN, CLOSED, DELETING, DELETED}  container states
-   * <p>
-   * Commented out for now: HDFS of type {Ratis, StandAlone, Chained} for each
-   * of these {ALLOCATED, CREATING, OPEN, CLOSED, DELETING, DELETED}  container
-   * states
-   */
-  private void initializeContainerMaps(String owner) {
-    // Called only from Ctor path, hence no lock is held.
-    Preconditions.checkNotNull(containers);
-    for (ReplicationType type : ReplicationType.values()) {
-      for (ReplicationFactor factor : ReplicationFactor.values()) {
-        for (LifeCycleState state : LifeCycleState.values()) {
-          ContainerKey key = new ContainerKey(owner, type, factor, state);
-          PriorityQueue<ContainerInfo> queue = new PriorityQueue<>();
-          containers.put(key, queue);
-        }
+  private void loadExistingContainers(Mapping containerMapping) {
+
+    List<ContainerInfo> containerList;
+    try {
+      containerList = containerMapping.listContainer(null,
+          null, Integer.MAX_VALUE);
+
+      // if there are no container to load, let us return.
+      if (containerList == null || containerList.size() == 0) {
+        LOG.info("No containers to load for this cluster.");
+        return;
+      }
+    } catch (IOException e) {
+      if (!e.getMessage().equals("No container exists in current db")) {
+        LOG.error("Could not list the containers", e);
       }
+      return;
     }
-  }
 
-  /**
-   * Load containers from the container store into the containerMaps.
-   *
-   * @param containerMapping -- Mapping object containing container store.
-   */
-  private void loadExistingContainers(Mapping containerMapping) {
     try {
-      List<String> ownerList = new ArrayList<>();
-      List<ContainerInfo> containerList =
-          containerMapping.listContainer(null, null, Integer.MAX_VALUE);
+      long maxID = 0;
       for (ContainerInfo container : containerList) {
-        String owner = container.getOwner();
-        if (ownerList.isEmpty() || !ownerList.contains(owner)) {
-          ownerList.add(owner);
-          initializeContainerMaps(owner);
+        containers.addContainer(container);
+
+        if (maxID < container.getContainerID()) {
+          maxID = container.getContainerID();
         }
-        ContainerKey key =
-            new ContainerKey(owner, container.getPipeline().getType(),
-                container.getPipeline().getFactor(), container.getState());
-        containers.get(key).add(container);
-      }
-    } catch (IOException e) {
-      if (!e.getMessage().equals("No container exists in current db")) {
-        LOG.info("Could not list the containers", e);
+
+        containerCount.set(maxID);
       }
+    } catch (SCMException ex) {
+      LOG.error("Unable to create a container information. ", ex);
+      // Fix me, what is the proper shutdown procedure for SCM ??
+      // System.exit(1) // Should we exit here?
     }
   }
 
@@ -230,9 +195,11 @@ public class ContainerStateManager implements Closeable {
    *
    * @return the list of all container info.
    */
-  List<ContainerInfo> getAllContainers() {
+  public List<ContainerInfo> getAllContainers() {
     List<ContainerInfo> list = new ArrayList<>();
-    containers.forEach((key, value) -> list.addAll(value));
+
+    //No Locking needed since the return value is an immutable map.
+    containers.getContainerMap().forEach((key, value) -> list.add(value));
     return list;
   }
 
@@ -315,7 +282,7 @@ public class ContainerStateManager implements Closeable {
    * @param replicationFactor - Replication replicationFactor.
    * @param containerName - Container Name.
    * @return Container Info.
-   * @throws IOException
+   * @throws IOException  on Failure.
    */
   public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
       .ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor,
@@ -335,22 +302,11 @@ public class ContainerStateManager implements Closeable {
         .setNumberOfKeys(0)
         .setStateEnterTime(Time.monotonicNow())
         .setOwner(owner)
+        .setContainerID(containerCount.incrementAndGet())
         .build();
     Preconditions.checkNotNull(containerInfo);
-    lock.writeLock().lock();
-    try {
-      ContainerKey key = new ContainerKey(owner, type, replicationFactor,
-          containerInfo.getState());
-      PriorityQueue<ContainerInfo> queue = containers.get(key);
-      if (queue == null) {
-        initializeContainerMaps(owner);
-        queue = containers.get(key);
-      }
-      queue.add(containerInfo);
-      LOG.trace("New container allocated: {}", containerInfo);
-    } finally {
-      lock.writeLock().unlock();
-    }
+    containers.addContainer(containerInfo);
+    LOG.trace("New container allocated: {}", containerInfo);
     return containerInfo;
   }
 
@@ -360,7 +316,7 @@ public class ContainerStateManager implements Closeable {
    * @param info - ContainerInfo
    * @param event - LifeCycle Event
    * @return Updated ContainerInfo.
-   * @throws SCMException
+   * @throws SCMException  on Failure.
    */
   public ContainerInfo updateContainerState(ContainerInfo
       info, OzoneProtos.LifeCycleEvent event) throws SCMException {
@@ -369,7 +325,8 @@ public class ContainerStateManager implements Closeable {
       newState = this.stateMachine.getNextState(info.getState(), event);
     } catch (InvalidStateTransitionException ex) {
       String error = String.format("Failed to update container state %s, " +
-              "reason: invalid state transition from state: %s upon event: %s.",
+              "reason: invalid state transition from state: %s upon " +
+              "event: %s.",
           info.getPipeline().getContainerName(), info.getState(), event);
       LOG.error(error);
       throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
@@ -377,191 +334,119 @@ public class ContainerStateManager implements Closeable {
 
     // This is a post condition after executing getNextState.
     Preconditions.checkNotNull(newState);
-    Pipeline pipeline = info.getPipeline();
-
-    ContainerKey oldKey = new ContainerKey(info.getOwner(), pipeline.getType(),
-        pipeline.getFactor(), info.getState());
-
-    ContainerKey newKey = new ContainerKey(info.getOwner(), pipeline.getType(),
-        pipeline.getFactor(), newState);
-    lock.writeLock().lock();
-    try {
-
-      PriorityQueue<ContainerInfo> currentQueue = containers.get(oldKey);
-      // This should never happen, since we have initialized the map and
-      // queues to all possible states. No harm in asserting that info.
-      Preconditions.checkNotNull(currentQueue);
-
-      // TODO : Should we read this container info from the database if this
-      // is missing in the queue?. Right now we just add it into the queue.
-      // We also need a background thread that will remove unused containers
-      // from memory after 24 hours.  This is really a low priority work item
-      // since typical clusters will have less than 10's of millions of open
-      // containers at a given time, which we can easily keep in memory.
-
-      if (currentQueue.contains(info)) {
-        currentQueue.remove(info);
-      }
+    containers.updateState(info, info.getState(), newState);
+    return containers.getContainerInfo(info);
+  }
 
-      PriorityQueue<ContainerInfo> nextQueue = containers.get(newKey);
-      Preconditions.checkNotNull(nextQueue);
-
-      ContainerInfo containerInfo = new ContainerInfo.Builder()
-          .setContainerName(info.getContainerName())
-          .setState(newState)
-          .setPipeline(info.getPipeline())
-          .setAllocatedBytes(info.getAllocatedBytes())
-          .setUsedBytes(info.getUsedBytes())
-          .setNumberOfKeys(info.getNumberOfKeys())
-          .setStateEnterTime(Time.monotonicNow())
-          .setOwner(info.getOwner())
-          .build();
-      Preconditions.checkNotNull(containerInfo);
-      nextQueue.add(containerInfo);
-
-      return containerInfo;
-    } finally {
-      lock.writeLock().unlock();
-    }
+  /**
+   * Update the container State.
+   * @param info - Container Info
+   * @return  ContainerInfo
+   * @throws SCMException - on Error.
+   */
+  public ContainerInfo updateContainerInfo(ContainerInfo info)
+      throws SCMException {
+    containers.updateContainerInfo(info);
+    return containers.getContainerInfo(info);
   }
 
+
   /**
    * Return a container matching the attributes specified.
    *
    * @param size - Space needed in the Container.
-   * @param owner - Owner of the container {OZONE, CBLOCK}
+   * @param owner - Owner of the container - A specific nameservice.
    * @param type - Replication Type {StandAlone, Ratis}
    * @param factor - Replication Factor {ONE, THREE}
    * @param state - State of the Container-- {Open, Allocated etc.}
-   * @return ContainerInfo
+   * @return ContainerInfo, null if there is no match found.
    */
   public ContainerInfo getMatchingContainer(final long size,
       String owner, ReplicationType type, ReplicationFactor factor,
       LifeCycleState state) {
-    ContainerKey key = new ContainerKey(owner, type, factor, state);
-    lock.writeLock().lock();
-    try {
-      PriorityQueue<ContainerInfo> queue = containers.get(key);
-      if (queue == null) {
-        initializeContainerMaps(owner);
-        queue = containers.get(key);
-      }
-      if (queue.size() == 0) {
-        // We don't have any Containers of this type.
-        return null;
-      }
-      Iterator<ContainerInfo> iter = queue.iterator();
-      // Two assumptions here.
-      // 1. The Iteration on the heap is in ordered by the last used time.
-      // 2. We remove and add the node back to push the node to the end of
-      // the queue.
-
-      while (iter.hasNext()) {
-        ContainerInfo info = iter.next();
-        if (info.getAllocatedBytes() + size <= this.containerSize) {
-          queue.remove(info);
-          info.allocate(size);
-          info.updateLastUsedTime();
-          queue.add(info);
-
-          return info;
-        }
-      }
 
-    } finally {
-      lock.writeLock().unlock();
+    // Find containers that match the query spec, if no match return null.
+    NavigableSet<ContainerID> matchingSet =
+        containers.getMatchingContainerIDs(state, owner, factor, type);
+    if (matchingSet == null || matchingSet.size() == 0) {
+      return null;
     }
-    return null;
-  }
 
-  @VisibleForTesting
-  public List<ContainerInfo> getMatchingContainers(String owner,
-      ReplicationType type, ReplicationFactor factor, LifeCycleState state) {
-    ContainerKey key = new ContainerKey(owner, type, factor, state);
-    lock.readLock().lock();
-    try {
-      if (containers.get(key) == null) {
-        return null;
-      } else {
-        return Arrays.asList((ContainerInfo[]) containers.get(key)
-            .toArray(new ContainerInfo[0]));
-      }
-    } catch (Exception e) {
-      LOG.error("Could not get matching containers", e);
-    } finally {
-      lock.readLock().unlock();
+    // Get the last used container and find container above the last used
+    // container ID.
+    ContainerState key = new ContainerState(owner, type, factor);
+    ContainerID lastID = lastUsedMap.get(key);
+    if(lastID == null) {
+      lastID = matchingSet.first();
     }
-    return null;
-  }
-
-  @Override
-  public void close() throws IOException {
-    //TODO: update container metadata db with actual allocated bytes values.
-  }
 
-  /**
-   * Class that acts as the container Key.
-   */
-  private static class ContainerKey {
-    private final LifeCycleState state;
-    private final ReplicationType type;
-    private final String owner;
-    private final ReplicationFactor replicationFactor;
-
-    /**
-     * Constructs a Container Key.
-     *
-     * @param owner - Container Owners
-     * @param type - Replication Type.
-     * @param factor - Replication Factors
-     * @param state - LifeCycle State
-     */
-    ContainerKey(String owner, ReplicationType type,
-        ReplicationFactor factor, LifeCycleState state) {
-      this.state = state;
-      this.type = type;
-      this.owner = owner;
-      this.replicationFactor = factor;
+    // There is a small issue here. The first time, we will skip the first
+    // container. But in most cases it will not matter.
+    NavigableSet<ContainerID> resultSet = matchingSet.tailSet(lastID, false);
+    if (resultSet.size() == 0) {
+      resultSet = matchingSet;
     }
 
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
+    ContainerInfo selectedContainer =
+        findContainerWithSpace(size, resultSet, owner);
+    if (selectedContainer == null) {
 
-      ContainerKey that = (ContainerKey) o;
+      // If we did not find any space in the tailSet, we need to look for
+      // space in the headset, we need to pass true to deal with the
+      // situation that we have a lone container that has space. That is we
+      // ignored the last used container under the assumption we can find
+      // other containers with space, but if have a single container that is
+      // not true. Hence we need to include the last used container as the
+      // last element in the sorted set.
 
-      return new EqualsBuilder()
-          .append(state, that.state)
-          .append(type, that.type)
-          .append(owner, that.owner)
-          .append(replicationFactor, that.replicationFactor)
-          .isEquals();
+      resultSet = matchingSet.headSet(lastID, true);
+      selectedContainer = findContainerWithSpace(size, resultSet, owner);
     }
-
-    @Override
-    public int hashCode() {
-      return new HashCodeBuilder(137, 757)
-          .append(state)
-          .append(type)
-          .append(owner)
-          .append(replicationFactor)
-          .toHashCode();
+    // Update the allocated Bytes on this container.
+    if(selectedContainer != null) {
+      selectedContainer.updateAllocatedBytes(size);
     }
+    return selectedContainer;
 
-    @Override
-    public String toString() {
-      return "ContainerKey{" +
-          "state=" + state +
-          ", type=" + type +
-          ", owner=" + owner +
-          ", replicationFactor=" + replicationFactor +
-          '}';
+  }
+
+  private ContainerInfo findContainerWithSpace(long size,
+      NavigableSet<ContainerID> searchSet, String owner) {
+    // Get the container with space to meet our request.
+    for (ContainerID id : searchSet) {
+      ContainerInfo containerInfo = containers.getContainerInfo(id.getId());
+      if ((containerInfo.getAllocatedBytes() <= this.containerSize) &&
+          (containerInfo.getAllocatedBytes() <=  size)) {
+        containerInfo.updateLastUsedTime();
+
+        ContainerState key = new ContainerState(owner,
+            containerInfo.getPipeline().getType(),
+            containerInfo.getPipeline().getFactor());
+        lastUsedMap.put(key, containerInfo.containerID());
+        return containerInfo;
+      }
     }
+    return null;
+  }
+
+  /**
+   * Returns a set of ContainerIDs that match the Container.
+   *
+   * @param owner  Owner of the Containers.
+   * @param type - Replication Type of the containers
+   * @param factor - Replication factor of the containers.
+   * @param state - Current State, like Open, Close etc.
+   * @return Set of containers that match the specific query parameters.
+   */
+  public NavigableSet<ContainerID> getMatchingContainerIDs(
+      String owner, ReplicationType type, ReplicationFactor factor,
+      LifeCycleState state) {
+    return containers.getMatchingContainerIDs(state, owner,
+        factor, type);
   }
+
+  @Override
+  public void close() throws IOException {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java
new file mode 100644
index 0000000..1372e7f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerAttribute.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.hadoop.ozone.scm.container.ContainerStates;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_CONTAINER_STATE;
+
+/**
+ * Each Attribute that we manage for a container is maintained as a map.
+ * <p>
+ * Currently we manage the following attributes for a container.
+ * <p>
+ * 1. StateMap - LifeCycleState -> Set of ContainerIDs
+ * 2. TypeMap  - ReplicationType -> Set of ContainerIDs
+ * 3. OwnerMap - OwnerNames -> Set of ContainerIDs
+ * 4. FactorMap - ReplicationFactor -> Set of ContainerIDs
+ * <p>
+ * This means that for a cluster size of 750 PB -- we will have around 150
+ * Million containers, if we assume 5GB average container size.
+ * <p>
+ * That implies that these maps will take around 2/3 GB of RAM which will be
+ * pinned down in the SCM. This is deemed acceptable since we can tune the
+ * container size --say we make it 10GB average size, then we can deal with a
+ * cluster size of 1.5 exa bytes with the same metadata in SCMs memory.
+ * <p>
+ * Please note: **This class is not thread safe**. This used to be thread safe,
+ * while bench marking we found that ContainerStateMap would be taking 5
+ * locks for a single container insert. If we remove locks in this class,
+ * then we are able to perform about 540K operations per second, with the
+ * locks in this class it goes down to 246K operations per second. Hence we
+ * are going to rely on ContainerStateMap locks to maintain consistency of
+ * data in these classes too, since ContainerAttribute is only used by
+ * ContainerStateMap class.
+ */
+public class ContainerAttribute<T> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerAttribute.class);
+
+  private final Map<T, NavigableSet<ContainerID>> attributeMap;
+  private static final NavigableSet<ContainerID> EMPTY_SET =  Collections
+      .unmodifiableNavigableSet(new TreeSet<>());
+
+  /**
+   * Creates a Container Attribute map from an existing Map.
+   *
+   * @param attributeMap - AttributeMap
+   */
+  public ContainerAttribute(Map<T, NavigableSet<ContainerID>> attributeMap) {
+    this.attributeMap = attributeMap;
+  }
+
+  /**
+   * Create an empty Container Attribute map.
+   */
+  public ContainerAttribute() {
+    this.attributeMap = new HashMap<>();
+  }
+
+  /**
+   * Insert or update the value in the Attribute map.
+   *
+   * @param key - The key to the set where the ContainerID should exist.
+   * @param value - Actual Container ID.
+   * @throws SCMException - on Error
+   */
+  public boolean insert(T key, ContainerID value) throws SCMException {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(value);
+
+    if (attributeMap.containsKey(key)) {
+      if (attributeMap.get(key).add(value)) {
+        return true; //we inserted the value as it doesn’t exist in the set.
+      } else { // Failure indicates that this ContainerID exists in the Set
+        if (!attributeMap.get(key).remove(value)) {
+          LOG.error("Failure to remove the object from the Map.Key:{}, " +
+              "ContainerID: {}", key, value);
+          throw new SCMException("Failure to remove the object from the Map",
+              FAILED_TO_CHANGE_CONTAINER_STATE);
+        }
+        attributeMap.get(key).add(value);
+        return true;
+      }
+    } else {
+      // This key does not exist, we need to allocate this key in the map.
+      // TODO: Replace TreeSet with FoldedTreeSet from HDFS Utils.
+      // Skipping for now, since FoldedTreeSet does not have implementations
+      // for headSet and TailSet. We need those calls.
+      this.attributeMap.put(key, new TreeSet<>());
+      // This should not fail, we just allocated this object.
+      attributeMap.get(key).add(value);
+      return true;
+    }
+  }
+
+  /**
+   * Returns true if have this bucket in the attribute map.
+   *
+   * @param key - Key to lookup
+   * @return true if we have the key
+   */
+  public boolean hasKey(T key) {
+    Preconditions.checkNotNull(key);
+    return this.attributeMap.containsKey(key);
+  }
+
+  /**
+   * Returns true if we have the key and the containerID in the bucket.
+   *
+   * @param key - Key to the bucket
+   * @param id - container ID that we want to lookup
+   * @return true or false
+   */
+  public boolean hasContainerID(T key, ContainerID id) {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(id);
+
+    return this.attributeMap.containsKey(key) &&
+        this.attributeMap.get(key).contains(id);
+  }
+
+  /**
+   * Returns true if we have the key and the containerID in the bucket.
+   *
+   * @param key - Key to the bucket
+   * @param id - container ID that we want to lookup
+   * @return true or false
+   */
+  public boolean hasContainerID(T key, int id) {
+    return hasContainerID(key, ContainerID.valueof(id));
+  }
+
+  /**
+   * Clears all entries for this key type.
+   *
+   * @param key - Key that identifies the Set.
+   */
+  public void clearSet(T key) {
+    Preconditions.checkNotNull(key);
+
+    if (attributeMap.containsKey(key)) {
+      attributeMap.get(key).clear();
+    } else {
+      LOG.debug("key: {} does not exist in the attributeMap", key);
+    }
+  }
+
+  /**
+   * Removes a container ID from the set pointed by the key.
+   *
+   * @param key - key to identify the set.
+   * @param value - Container ID
+   */
+  public boolean remove(T key, ContainerID value) {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(value);
+
+    if (attributeMap.containsKey(key)) {
+      if (!attributeMap.get(key).remove(value)) {
+        LOG.debug("ContainerID: {} does not exist in the set pointed by " +
+            "key:{}", value, key);
+        return false;
+      }
+      return true;
+    } else {
+      LOG.debug("key: {} does not exist in the attributeMap", key);
+      return false;
+    }
+  }
+
+  /**
+   * Returns the collection that maps to the given key.
+   *
+   * @param key - Key to the bucket.
+   * @return Underlying Set in immutable form.
+   */
+  public NavigableSet<ContainerID> getCollection(T key) {
+    Preconditions.checkNotNull(key);
+
+    if (this.attributeMap.containsKey(key)) {
+      return Collections.unmodifiableNavigableSet(this.attributeMap.get(key));
+    }
+    LOG.debug("No such Key. Key {}", key);
+    return EMPTY_SET;
+  }
+
+  /**
+   * Moves a ContainerID from one bucket to another.
+   *
+   * @param currentKey - Current Key
+   * @param newKey - newKey
+   * @param value - ContainerID
+   * @throws SCMException on Error
+   */
+  public void update(T currentKey, T newKey, ContainerID value)
+      throws SCMException {
+    Preconditions.checkNotNull(currentKey);
+    Preconditions.checkNotNull(newKey);
+
+    boolean removed = false;
+    try {
+      removed = remove(currentKey, value);
+      if (!removed) {
+        throw new SCMException("Unable to find key in the current key bucket",
+            FAILED_TO_CHANGE_CONTAINER_STATE);
+      }
+      insert(newKey, value);
+    } catch (SCMException ex) {
+      // if we removed the key, insert it back to original bucket, since the
+      // next insert failed.
+      LOG.error("error in update.", ex);
+      if (removed) {
+        insert(currentKey, value);
+        LOG.trace("reinserted the removed key. {}", currentKey);
+      }
+      throw ex;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java
new file mode 100644
index 0000000..6c492ff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerState.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.scm.container.ContainerStates;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+
+/**
+ * Class that acts as the container state.
+ */
+public class ContainerState {
+  private final OzoneProtos.ReplicationType type;
+  private final String owner;
+  private final OzoneProtos.ReplicationFactor replicationFactor;
+
+  /**
+   * Constructs a Container Key.
+   *
+   * @param owner - Container Owners
+   * @param type - Replication Type.
+   * @param factor - Replication Factors
+   */
+  public ContainerState(String owner, OzoneProtos.ReplicationType type,
+      OzoneProtos.ReplicationFactor factor) {
+    this.type = type;
+    this.owner = owner;
+    this.replicationFactor = factor;
+  }
+
+
+  public OzoneProtos.ReplicationType getType() {
+    return type;
+  }
+
+  public String getOwner() {
+    return owner;
+  }
+
+  public OzoneProtos.ReplicationFactor getFactor() {
+    return replicationFactor;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    ContainerState that = (ContainerState) o;
+
+    return new EqualsBuilder()
+        .append(type, that.type)
+        .append(owner, that.owner)
+        .append(replicationFactor, that.replicationFactor)
+        .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(137, 757)
+        .append(type)
+        .append(owner)
+        .append(replicationFactor)
+        .toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return "ContainerKey{" +
+        ", type=" + type +
+        ", owner=" + owner +
+        ", replicationFactor=" + replicationFactor +
+        '}';
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java
new file mode 100644
index 0000000..eebc6be
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/ContainerStateMap.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.scm.container.ContainerStates;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+    .CONTAINER_EXISTS;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_CONTAINER_STATE;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_FIND_CONTAINER;
+
+/**
+ * Container State Map acts like a unified map for various attributes that are
+ * used to select containers when we need allocated blocks.
+ * <p>
+ * This class provides the ability to query 4 classes of attributes. They are
+ * <p>
+ * 1. LifeCycleStates - LifeCycle States of container describe in which state
+ * a container is. For example, a container needs to be in Open State for a
+ * client to able to write to it.
+ * <p>
+ * 2. Owners - Each instance of Name service, for example, Namenode of HDFS or
+ * Key Space Manager (KSM) of Ozone or CBlockServer --  is an owner. It is
+ * possible to have many KSMs for a Ozone cluster and only one SCM. But SCM
+ * keeps the data from each KSM in separate bucket, never mixing them. To
+ * write data, often we have to find all open containers for a specific owner.
+ * <p>
+ * 3. ReplicationType - The clients are allowed to specify what kind of
+ * replication pipeline they want to use. Each Container exists on top of a
+ * pipeline, so we need to get ReplicationType that is specified by the user.
+ * <p>
+ * 4. ReplicationFactor - The replication factor represents how many copies
+ * of data should be made, right now we support 2 different types, ONE
+ * Replica and THREE Replica. User can specify how many copies should be made
+ * for a ozone key.
+ * <p>
+ * The most common access pattern of this class is to select a container based
+ * on all these parameters, for example, when allocating a block we will
+ * select a container that belongs to user1, with Ratis replication which can
+ * make 3 copies of data. The fact that we will look for open containers by
+ * default and if we cannot find them we will add new containers.
+ */
+public class ContainerStateMap {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerStateMap.class);
+
+  private final ContainerAttribute<LifeCycleState> lifeCycleStateMap;
+  private final ContainerAttribute<String> ownerMap;
+  private final ContainerAttribute<ReplicationFactor> factorMap;
+  private final ContainerAttribute<ReplicationType> typeMap;
+
+  private final Map<ContainerID, ContainerInfo> containerMap;
+  private final static NavigableSet<ContainerID> EMPTY_SET  =
+      Collections.unmodifiableNavigableSet(new TreeSet<>());
+
+  // Container State Map lock should be held before calling into
+  // Update ContainerAttributes. The consistency of ContainerAttributes is
+  // protected by this lock.
+  private final AutoCloseableLock autoLock;
+
+  /**
+   * Create a ContainerStateMap.
+   */
+  public ContainerStateMap() {
+    lifeCycleStateMap = new ContainerAttribute<>();
+    ownerMap = new ContainerAttribute<>();
+    factorMap = new ContainerAttribute<>();
+    typeMap = new ContainerAttribute<>();
+    containerMap = new HashMap<>();
+    autoLock = new AutoCloseableLock();
+//        new InstrumentedLock(getClass().getName(), LOG,
+//            new ReentrantLock(),
+//            1000,
+//            300));
+  }
+
+  /**
+   * Adds a ContainerInfo Entry in the ContainerStateMap.
+   *
+   * @param info - container info
+   * @throws SCMException - throws if create failed.
+   */
+  public void addContainer(ContainerInfo info)
+      throws SCMException {
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      ContainerID id = ContainerID.valueof(info.getContainerID());
+      if (containerMap.putIfAbsent(id, info) != null) {
+        LOG.debug("Duplicate container ID detected. {}", id);
+        throw new
+            SCMException("Duplicate container ID detected.",
+            CONTAINER_EXISTS);
+      }
+
+      lifeCycleStateMap.insert(info.getState(), id);
+      ownerMap.insert(info.getOwner(), id);
+      factorMap.insert(info.getPipeline().getFactor(), id);
+      typeMap.insert(info.getPipeline().getType(), id);
+      LOG.trace("Created container with {} successfully.", id);
+    }
+  }
+
+  /**
+   * Returns the latest state of Container from SCM's Container State Map.
+   *
+   * @param info - ContainerInfo
+   * @return ContainerInfo
+   */
+  public ContainerInfo getContainerInfo(ContainerInfo info) {
+    return getContainerInfo(info.getContainerID());
+  }
+
+  /**
+   * Returns the latest state of Container from SCM's Container State Map.
+   *
+   * @param containerID - int
+   * @return container info, if found.
+   */
+  public ContainerInfo getContainerInfo(long containerID) {
+    ContainerID id = new ContainerID(containerID);
+    return containerMap.get(id);
+  }
+
+  /**
+   * Returns the full container Map.
+   *
+   * @return - Map
+   */
+  public Map<ContainerID, ContainerInfo> getContainerMap() {
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return Collections.unmodifiableMap(containerMap);
+    }
+  }
+
+  /**
+   * Just update the container State.
+   * @param info ContainerInfo.
+   */
+  public void updateContainerInfo(ContainerInfo info) throws SCMException {
+    Preconditions.checkNotNull(info);
+    ContainerInfo currentInfo = null;
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      currentInfo = containerMap.get(
+          ContainerID.valueof(info.getContainerID()));
+
+      if (currentInfo == null) {
+        throw new SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+      }
+      containerMap.put(info.containerID(), info);
+    }
+  }
+
+  /**
+   * Update the State of a container.
+   *
+   * @param info - ContainerInfo
+   * @param currentState - CurrentState
+   * @param newState - NewState.
+   * @throws SCMException - in case of failure.
+   */
+  public void updateState(ContainerInfo info, LifeCycleState currentState,
+      LifeCycleState newState) throws SCMException {
+    Preconditions.checkNotNull(currentState);
+    Preconditions.checkNotNull(newState);
+
+    ContainerID id = new ContainerID(info.getContainerID());
+    ContainerInfo currentInfo = null;
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      currentInfo = containerMap.get(id);
+
+      if (currentInfo == null) {
+        throw new
+            SCMException("No such container.", FAILED_TO_FIND_CONTAINER);
+      }
+      // We are updating two places before this update is done, these can
+      // fail independently, since the code needs to handle it.
+
+      // We update the attribute map, if that fails it will throw an exception,
+      // so no issues, if we are successful, we keep track of the fact that we
+      // have updated the lifecycle state in the map, and update the container
+      // state. If this second update fails, we will attempt to roll back the
+      // earlier change we did. If the rollback fails, we can be in an
+      // inconsistent state,
+
+      info.setState(newState);
+      containerMap.put(id, info);
+      lifeCycleStateMap.update(currentState, newState, id);
+      LOG.trace("Updated the container {} to new state. Old = {}, new = " +
+          "{}", id, currentState, newState);
+    } catch (SCMException ex) {
+      LOG.error("Unable to update the container state. {}", ex);
+      // we need to revert the change in this attribute since we are not
+      // able to update the hash table.
+      LOG.info("Reverting the update to lifecycle state. Moving back to " +
+              "old state. Old = {}, Attempted state = {}", currentState,
+          newState);
+
+      containerMap.put(id, currentInfo);
+
+      // if this line throws, the state map can be in an inconsistent
+      // state, since we will have modified the attribute by the
+      // container state will not in sync since we were not able to put
+      // that into the hash table.
+      lifeCycleStateMap.update(newState, currentState, id);
+
+      throw new SCMException("Updating the container map failed.", ex,
+          FAILED_TO_CHANGE_CONTAINER_STATE);
+    }
+  }
+
+  /**
+   * Returns A list of containers owned by a name service.
+   *
+   * @param ownerName - Name of the NameService.
+   * @return - NavigableSet of ContainerIDs.
+   */
+  NavigableSet<ContainerID> getContainerIDsByOwner(String ownerName) {
+    Preconditions.checkNotNull(ownerName);
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return ownerMap.getCollection(ownerName);
+    }
+  }
+
+  /**
+   * Returns Containers in the System by the Type.
+   *
+   * @param type - Replication type -- StandAlone, Ratis etc.
+   * @return NavigableSet
+   */
+  NavigableSet<ContainerID> getContainerIDsByType(ReplicationType type) {
+    Preconditions.checkNotNull(type);
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return typeMap.getCollection(type);
+    }
+  }
+
+  /**
+   * Returns Containers by replication factor.
+   *
+   * @param factor - Replication Factor.
+   * @return NavigableSet.
+   */
+  NavigableSet<ContainerID> getContainerIDsByFactor(ReplicationFactor factor) {
+    Preconditions.checkNotNull(factor);
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return factorMap.getCollection(factor);
+    }
+  }
+
+  /**
+   * Returns Containers by State.
+   *
+   * @param state - State - Open, Closed etc.
+   * @return List of containers by state.
+   */
+  NavigableSet<ContainerID> getContainerIDsByState(LifeCycleState state) {
+    Preconditions.checkNotNull(state);
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+      return lifeCycleStateMap.getCollection(state);
+    }
+  }
+
+  /**
+   * Gets the containers that matches the  following filters.
+   *
+   * @param state - LifeCycleState
+   * @param owner - Owner
+   * @param factor - Replication Factor
+   * @param type - Replication Type
+   * @return ContainerInfo or Null if not container satisfies the criteria.
+   */
+  public NavigableSet<ContainerID> getMatchingContainerIDs(
+      LifeCycleState state, String owner,
+      ReplicationFactor factor, ReplicationType type) {
+
+    Preconditions.checkNotNull(state, "State cannot be null");
+    Preconditions.checkNotNull(owner, "Owner cannot be null");
+    Preconditions.checkNotNull(factor, "Factor cannot be null");
+    Preconditions.checkNotNull(type, "Type cannot be null");
+
+    try (AutoCloseableLock lock = autoLock.acquire()) {
+
+      // If we cannot meet any one condition we return EMPTY_SET immediately.
+      // Since when we intersect these sets, the result will be empty if any
+      // one is empty.
+      NavigableSet<ContainerID> stateSet =
+          lifeCycleStateMap.getCollection(state);
+      if (stateSet.size() == 0) {
+        return EMPTY_SET;
+      }
+
+      NavigableSet<ContainerID> ownerSet = ownerMap.getCollection(owner);
+      if (ownerSet.size() == 0) {
+        return EMPTY_SET;
+      }
+
+      NavigableSet<ContainerID> factorSet = factorMap.getCollection(factor);
+      if (factorSet.size() == 0) {
+        return EMPTY_SET;
+      }
+
+      NavigableSet<ContainerID> typeSet = typeMap.getCollection(type);
+      if (typeSet.size() == 0) {
+        return EMPTY_SET;
+      }
+
+
+      // if we add more constraints we will just add those sets here..
+      NavigableSet<ContainerID>[] sets = sortBySize(stateSet,
+          ownerSet, factorSet, typeSet);
+
+      NavigableSet<ContainerID> currentSet = sets[0];
+      // We take the smallest set and intersect against the larger sets. This
+      // allows us to reduce the lookups to the least possible number.
+      for (int x = 1; x < sets.length; x++) {
+        currentSet = intersectSets(currentSet, sets[x]);
+      }
+      return currentSet;
+    }
+  }
+
+  /**
+   * Calculates the intersection between sets and returns a new set.
+   *
+   * @param smaller - First Set
+   * @param bigger - Second Set
+   * @return resultSet which is the intersection of these two sets.
+   */
+  private NavigableSet<ContainerID> intersectSets(
+      NavigableSet<ContainerID> smaller,
+      NavigableSet<ContainerID> bigger) {
+    Preconditions.checkState(smaller.size() <= bigger.size(),
+        "This function assumes the first set is lesser or equal to second " +
+            "set");
+    NavigableSet<ContainerID> resultSet = new TreeSet<>();
+    for (ContainerID id : smaller) {
+      if (bigger.contains(id)) {
+        resultSet.add(id);
+      }
+    }
+    return resultSet;
+  }
+
+  /**
+   * Sorts a list of Sets based on Size. This is useful when we are
+   * intersecting the sets.
+   *
+   * @param sets - varagrs of sets
+   * @return Returns a sorted array of sets based on the size of the set.
+   */
+  @SuppressWarnings("unchecked")
+  private NavigableSet<ContainerID>[] sortBySize(
+      NavigableSet<ContainerID>... sets) {
+    for (int x = 0; x < sets.length - 1; x++) {
+      for (int y = 0; y < sets.length - x - 1; y++) {
+        if (sets[y].size() > sets[y + 1].size()) {
+          NavigableSet temp = sets[y];
+          sets[y] = sets[y + 1];
+          sets[y + 1] = temp;
+        }
+      }
+    }
+    return sets;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
new file mode 100644
index 0000000..6a7e663
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStates/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+/**
+ * Container States management package.
+ */
+package org.apache.hadoop.ozone.scm.container.ContainerStates;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index 9a82c94..6f2717c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -103,6 +103,7 @@ message ContainerInfo {
   optional int64 writeCount = 7;
   optional int64 readBytes = 8;
   optional int64 writeBytes = 9;
+  required int64 containerID = 10;
 }
 
 // The deleted blocks which are stored in deletedBlock.db of scm.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index e1df1b5..e76f986 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -122,6 +122,8 @@ public final class ContainerTestHelper {
     final Pipeline pipeline = new Pipeline(leader.getDatanodeUuid());
     pipeline.setContainerName(containerName);
     pipeline.addMember(leader);
+    pipeline.setFactor(OzoneProtos.ReplicationFactor.ONE);
+    pipeline.setType(OzoneProtos.ReplicationType.STAND_ALONE);
 
     for(; i.hasNext();) {
       pipeline.addMember(i.next());
@@ -346,7 +348,7 @@ public final class ContainerTestHelper {
    */
   public static ContainerCommandRequestProto getCreateContainerRequest(
       String containerName, Pipeline pipeline) throws IOException {
-    LOG.trace("createContainer: {}", containerName);
+    LOG.trace("addContainer: {}", containerName);
 
     ContainerProtos.CreateContainerRequestProto.Builder createRequest =
         ContainerProtos.CreateContainerRequestProto

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java
index bbe24cf..46a3821 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java
@@ -72,14 +72,17 @@ public class ReplicationDatanodeStateManager {
           "required container reports");
     }
 
+    int containerID = 1;
     while (containerList.size() < dataNodeCount && nodesInPool.size() > 0) {
       DatanodeID id = nodesInPool.get(r.nextInt(nodesInPool.size()));
       nodesInPool.remove(id);
+      containerID++;
       // We return container reports only for nodes that are healthy.
       if (nodeManager.getNodeState(id) == HEALTHY) {
         ContainerInfo info = ContainerInfo.newBuilder()
             .setContainerName(containerName)
             .setFinalhash(DigestUtils.sha256Hex(containerName))
+            .setContainerID(containerID)
             .build();
         ContainerReportsRequestProto containerReport =
             ContainerReportsRequestProto.newBuilder().addReports(info)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7955ee40/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index fc5f25f..6ff08d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -448,6 +448,8 @@ public class TestEndPoint {
       report.setReadBytes(OzoneConsts.GB * 1);
       report.setWriteCount(50);
       report.setWriteBytes(OzoneConsts.GB * 2);
+      report.setContainerID(1);
+
       reportsBuilder.addReports(report.getProtoBufMessage());
     }
     reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID()


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org