You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/07/14 18:24:33 UTC

hadoop git commit: HDFS-11493. Ozone: SCM: Add the ability to handle container reports. Contributed by Anu Engineer.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 5341fa13a -> 8d37ef30e


HDFS-11493. Ozone: SCM: Add the ability to handle container reports. Contributed by Anu Engineer.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8d37ef30
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8d37ef30
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8d37ef30

Branch: refs/heads/HDFS-7240
Commit: 8d37ef30e10c23a87388e64f6117a76ec68de08a
Parents: 5341fa1
Author: Anu Engineer <ae...@apache.org>
Authored: Fri Jul 14 11:18:36 2017 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Fri Jul 14 11:18:36 2017 -0700

----------------------------------------------------------------------
 .../hadoop/util/concurrent/HadoopExecutors.java |  41 +++
 .../org/apache/hadoop/scm/ScmConfigKeys.java    |  23 ++
 .../ContainerReplicationManager.java            | 296 +++++++++++++++++
 .../container/replication/InProgressPool.java   | 302 ++++++++++++++++++
 .../scm/container/replication/PeriodicPool.java | 119 +++++++
 .../scm/container/replication/package-info.java |  23 ++
 .../hadoop/ozone/scm/node/CommandQueue.java     | 126 +++++++-
 .../hadoop/ozone/scm/node/NodeManager.java      |  10 +-
 .../hadoop/ozone/scm/node/SCMNodeManager.java   |  37 +++
 .../StorageContainerDatanodeProtocol.proto      |   5 +-
 .../ReplicationDatanodeStateManager.java        |  92 ++++++
 .../TestUtils/ReplicationNodeManagerMock.java   | 315 +++++++++++++++++++
 .../ReplicationNodePoolManagerMock.java         | 132 ++++++++
 .../ozone/container/TestUtils/package-info.java |  18 ++
 .../ozone/container/common/SCMTestUtils.java    |   8 +-
 .../ozone/container/common/TestEndPoint.java    |  12 +-
 .../common/impl/TestContainerPersistence.java   |   5 +-
 .../TestContainerReplicationManager.java        | 259 +++++++++++++++
 .../container/replication/package-info.java     |  18 ++
 .../apache/hadoop/ozone/scm/TestSCMMXBean.java  |   5 +-
 .../ozone/scm/container/MockNodeManager.java    |  11 +
 21 files changed, 1831 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
index 1bc6976..4b725f0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java
@@ -20,6 +20,8 @@
 
 package org.apache.hadoop.util.concurrent;
 
+import org.slf4j.Logger;
+
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -41,6 +43,14 @@ public final class HadoopExecutors {
         threadFactory);
   }
 
+  public static ExecutorService newCachedThreadPool(ThreadFactory
+      threadFactory, int maxThreads) {
+    return new HadoopThreadPoolExecutor(0, maxThreads,
+        60L, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(),
+        threadFactory);
+  }
+
   public static ExecutorService newFixedThreadPool(int nThreads,
       ThreadFactory threadFactory) {
     return new HadoopThreadPoolExecutor(nThreads, nThreads,
@@ -91,6 +101,37 @@ public final class HadoopExecutors {
     return Executors.newSingleThreadScheduledExecutor(threadFactory);
   }
 
+  /**
+   * Helper routine to shutdown a executorService.
+   * @param executorService - executorService
+   * @param logger - Logger
+   * @param timeout - Timeout
+   * @param unit - TimeUnits, generally seconds.
+   */
+  public static void shutdown(ExecutorService executorService, Logger logger,
+      long timeout, TimeUnit unit) {
+    try {
+      if (executorService != null) {
+        executorService.shutdown();
+        try {
+          if (!executorService.awaitTermination(timeout, unit)) {
+            executorService.shutdownNow();
+          }
+
+          if (!executorService.awaitTermination(timeout, unit)) {
+            logger.error("Unable to shutdown properly.");
+          }
+        } catch (InterruptedException e) {
+          logger.error("Error attempting to shutdown.", e);
+          executorService.shutdownNow();
+        }
+      }
+    } catch (Exception e) {
+      logger.error("Error during shutdown: ", e);
+      throw e;
+    }
+  }
+
   //disable instantiation
   private HadoopExecutors() { }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index 1c61feb..73d81f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -191,6 +191,29 @@ public final class ScmConfigKeys {
 
 
   /**
+   * Don't start processing a pool if we have not had a minimum number of
+   * seconds from the last processing.
+   */
+  public static final String
+      OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_SECONDS =
+      "ozone.scm.container.report.processing.interval.seconds";
+  public static final int
+      OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = 60;
+
+  /**
+   * These 2 settings control the number of threads in executor pool and time
+   * outs for thw container reports from all nodes.
+   */
+  public static final String OZONE_SCM_MAX_CONTAINER_REPORT_THREADS =
+      "ozone.scm.max.container.report.threads";
+  public static final int OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT = 100;
+  public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS =
+      "ozone.scm.container.reports.wait.timeout.seconds";
+  public static final int OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT =
+      300; // Default 5 minute wait.
+
+
+  /**
    * Never constructed.
    */
   private ScmConfigKeys() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
new file mode 100644
index 0000000..71836db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/ContainerReplicationManager.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.node.CommandQueue;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.util.concurrent.Uninterruptibles
+    .sleepUninterruptibly;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_SECONDS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS;
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT;
+
+/**
+ * This class takes a set of container reports that belong to a pool and then
+ * computes the replication levels for each container.
+ */
+public class ContainerReplicationManager implements Closeable {
+  static final Logger LOG =
+      LoggerFactory.getLogger(ContainerReplicationManager.class);
+
+  private final NodePoolManager poolManager;
+  private final CommandQueue commandQueue;
+  private final HashSet<String> poolNames;
+  private final PriorityQueue<PeriodicPool> poolQueue;
+  private final NodeManager nodeManager;
+  private final int containerProcessingLag;
+  private final AtomicBoolean runnable;
+  private final ExecutorService executorService;
+  private final int maxPoolWait;
+  private long poolProcessCount;
+  private final List<InProgressPool> inProgressPoolList;
+  private final AtomicInteger threadFaultCount;
+
+  /**
+   * Returns the number of times we have processed pools.
+   * @return long
+   */
+  public long getPoolProcessCount() {
+    return poolProcessCount;
+  }
+
+
+  /**
+   * Constructs a class that computes Replication Levels.
+   *
+   * @param conf - OzoneConfiguration
+   * @param nodeManager - Node Manager
+   * @param poolManager - Pool Manager
+   * @param commandQueue - Datanodes Command Queue.
+   */
+  public ContainerReplicationManager(OzoneConfiguration conf,
+      NodeManager nodeManager, NodePoolManager poolManager,
+      CommandQueue commandQueue) {
+    Preconditions.checkNotNull(poolManager);
+    Preconditions.checkNotNull(commandQueue);
+    Preconditions.checkNotNull(nodeManager);
+    this.containerProcessingLag =
+        conf.getInt(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_SECONDS,
+            OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT
+
+        ) * 1000;
+    int maxContainerReportThreads =
+        conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS,
+            OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT
+        );
+    this.maxPoolWait =
+        conf.getInt(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS,
+            OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT) * 1000;
+    this.poolManager = poolManager;
+    this.commandQueue = commandQueue;
+    this.nodeManager = nodeManager;
+    this.poolNames = new HashSet<>();
+    this.poolQueue = new PriorityQueue<>();
+    runnable = new AtomicBoolean(true);
+    this.threadFaultCount = new AtomicInteger(0);
+    executorService = HadoopExecutors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("Container Reports Processing Thread - %d")
+            .build(), maxContainerReportThreads);
+    inProgressPoolList = new LinkedList<>();
+
+    initPoolProcessThread();
+  }
+
+  /**
+   * Returns the number of pools that are under process right now.
+   * @return  int - Number of pools that are in process.
+   */
+  public int getInProgressPoolCount() {
+    return inProgressPoolList.size();
+  }
+
+  /**
+   * Exits the background thread.
+   */
+  public void setExit() {
+    this.runnable.set(false);
+  }
+
+  /**
+   * Adds or removes pools from names that we need to process.
+   *
+   * There are two different cases that we need to process.
+   * The case where some pools are being added and some times we have to
+   * handle cases where pools are removed.
+   */
+  private void refreshPools() {
+    List<String> pools = this.poolManager.getNodePools();
+    if (pools != null) {
+
+      HashSet<String> removedPools =
+          computePoolDifference(this.poolNames, new HashSet<>(pools));
+
+      HashSet<String> addedPools =
+          computePoolDifference(new HashSet<>(pools), this.poolNames);
+      // TODO: Support remove pool API in pool manager so that this code
+      // path can be tested. This never happens in the current code base.
+      for (String poolName : removedPools) {
+        for (PeriodicPool periodicPool : poolQueue) {
+          if (periodicPool.getPoolName().compareTo(poolName) == 0) {
+            poolQueue.remove(periodicPool);
+          }
+        }
+      }
+      // Remove the pool names that we have in the list.
+      this.poolNames.removeAll(removedPools);
+
+      for (String poolName : addedPools) {
+        poolQueue.add(new PeriodicPool(poolName));
+      }
+
+      // Add to the pool names we are tracking.
+      poolNames.addAll(addedPools);
+    }
+
+  }
+
+  /**
+   * Handle the case where pools are added.
+   *
+   * @param newPools - New Pools list
+   * @param oldPool - oldPool List.
+   */
+  private HashSet<String> computePoolDifference(HashSet<String> newPools,
+      Set<String> oldPool) {
+    Preconditions.checkNotNull(newPools);
+    Preconditions.checkNotNull(oldPool);
+    HashSet<String> newSet = new HashSet<>(newPools);
+    newSet.removeAll(oldPool);
+    return newSet;
+  }
+
+  private void initPoolProcessThread() {
+
+    /*
+     * Task that runs to check if we need to start a pool processing job.
+     * if so we create a pool reconciliation job and find out of all the
+     * expected containers are on the nodes.
+     */
+    Runnable processPools = () -> {
+      while (runnable.get()) {
+        // Make sure that we don't have any new pools.
+        refreshPools();
+        PeriodicPool pool = poolQueue.poll();
+        if (pool != null) {
+          if (pool.getLastProcessedTime() + this.containerProcessingLag <
+              Time.monotonicNow()) {
+            LOG.debug("Adding pool {} to container processing queue", pool
+                .getPoolName());
+            InProgressPool inProgressPool =  new InProgressPool(maxPoolWait,
+                pool, this.nodeManager, this.poolManager, this.commandQueue,
+                this.executorService);
+            inProgressPool.startReconciliation();
+            inProgressPoolList.add(inProgressPool);
+            poolProcessCount++;
+
+          } else {
+
+            LOG.debug("Not within the time window for processing: {}",
+                pool.getPoolName());
+            // Put back this pool since we are not planning to process it.
+            poolQueue.add(pool);
+            // we might over sleep here, not a big deal.
+            sleepUninterruptibly(this.containerProcessingLag,
+                TimeUnit.MILLISECONDS);
+          }
+        }
+        sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS);
+      }
+    };
+
+    // We will have only one thread for pool processing.
+    Thread poolProcessThread = new Thread(processPools);
+    poolProcessThread.setDaemon(true);
+    poolProcessThread.setName("Pool replica thread");
+    poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> {
+      // Let us just restart this thread after logging a critical error.
+      // if this thread is not running we cannot handle commands from SCM.
+      LOG.error("Critical Error : Pool replica thread encountered an " +
+          "error. Thread: {} Error Count : {}", t.toString(), e,
+          threadFaultCount.incrementAndGet());
+      poolProcessThread.start();
+      // TODO : Add a config to restrict how many times we will restart this
+      // thread in a single session.
+    });
+    poolProcessThread.start();
+  }
+
+  /**
+   * Adds a container report to appropriate inProgress Pool.
+   * @param containerReport  -- Container report for a specific container from
+   * a datanode.
+   */
+  public void handleContainerReport(ContainerReportsProto containerReport) {
+    String poolName = poolManager.getNodePool(
+        DatanodeID.getFromProtoBuf(containerReport.getDatanodeID()));
+
+    for(InProgressPool ppool : inProgressPoolList) {
+      if(ppool.getPoolName().equalsIgnoreCase(poolName)) {
+        ppool.handleContainerReport(containerReport);
+        return;
+      }
+    }
+    // TODO: Decide if we can do anything else with this report.
+    LOG.debug("Discarding the container report for pool {}. That pool is not " +
+        "currently in the pool reconciliation process. Container Name: {}",
+        poolName, containerReport.getDatanodeID());
+  }
+
+  /**
+   * Get in process pool list, used for testing.
+   * @return List of InProgressPool
+   */
+  @VisibleForTesting
+  public List<InProgressPool> getInProcessPoolList() {
+    return inProgressPoolList;
+  }
+
+  /**
+   * Shutdown the Container Replication Manager.
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    setExit();
+    HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
new file mode 100644
index 0000000..87629e1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/InProgressPool.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container.replication;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.node.CommandQueue;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static com.google.common.util.concurrent.Uninterruptibles
+    .sleepUninterruptibly;
+import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
+import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
+import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.UNKNOWN;
+
+/**
+ * These are pools that are actively checking for replication status of the
+ * containers.
+ */
+public final class InProgressPool {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(InProgressPool.class);
+  private final PeriodicPool pool;
+  private final CommandQueue commandQueue;
+  private final NodeManager nodeManager;
+  private final NodePoolManager poolManager;
+  private final ExecutorService executorService;
+  private final Map<String, Integer> containerCountMap;
+  private final Map<String, Boolean> processedNodeSet;
+  private final long startTime;
+  private ProgressStatus status;
+  private AtomicInteger nodeCount;
+  private AtomicInteger nodeProcessed;
+  private AtomicInteger containerProcessedCount;
+  private int maxWaitTime;
+  /**
+   * Constructs an pool that is being processed.
+   *
+   * @param maxWaitTime - Maximum wait time in milliseconds.
+   * @param pool - Pool that we are working against
+   * @param nodeManager - Nodemanager
+   * @param poolManager - pool manager
+   * @param commandQueue - Command queue
+   * @param executorService - Shared Executor service.
+   */
+  InProgressPool(int maxWaitTime, PeriodicPool pool,
+      NodeManager nodeManager, NodePoolManager poolManager,
+      CommandQueue commandQueue, ExecutorService executorService) {
+    Preconditions.checkNotNull(pool);
+    Preconditions.checkNotNull(nodeManager);
+    Preconditions.checkNotNull(poolManager);
+    Preconditions.checkNotNull(commandQueue);
+    Preconditions.checkNotNull(executorService);
+    Preconditions.checkArgument(maxWaitTime > 0);
+    this.pool = pool;
+    this.nodeManager = nodeManager;
+    this.poolManager = poolManager;
+    this.commandQueue = commandQueue;
+    this.executorService = executorService;
+    this.containerCountMap = new ConcurrentHashMap<>();
+    this.processedNodeSet = new ConcurrentHashMap<>();
+    this.maxWaitTime = maxWaitTime;
+    startTime = Time.monotonicNow();
+  }
+
+  /**
+   * Returns periodic pool.
+   *
+   * @return PeriodicPool
+   */
+  public PeriodicPool getPool() {
+    return pool;
+  }
+
+  /**
+   * We are done if we have got reports from all nodes or we have
+   * done waiting for the specified time.
+   *
+   * @return true if we are done, false otherwise.
+   */
+  public boolean isDone() {
+    return (nodeCount.get() == nodeProcessed.get()) ||
+        (this.startTime + this.maxWaitTime) > Time.monotonicNow();
+  }
+
+  /**
+   * Gets the number of containers processed.
+   *
+   * @return int
+   */
+  public int getContainerProcessedCount() {
+    return containerProcessedCount.get();
+  }
+
+  /**
+   * Returns the start time in milliseconds.
+   *
+   * @return - Start Time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Get the number of nodes in this pool.
+   *
+   * @return - node count
+   */
+  public int getNodeCount() {
+    return nodeCount.get();
+  }
+
+  /**
+   * Get the number of nodes that we have already processed container reports
+   * from.
+   *
+   * @return - Processed count.
+   */
+  public int getNodeProcessed() {
+    return nodeProcessed.get();
+  }
+
+  /**
+   * Returns the current status.
+   *
+   * @return Status
+   */
+  public ProgressStatus getStatus() {
+    return status;
+  }
+
+  /**
+   * Starts the reconciliation process for all the nodes in the pool.
+   */
+  public void startReconciliation() {
+    List<DatanodeID> datanodeIDList =
+        this.poolManager.getNodes(pool.getPoolName());
+    if (datanodeIDList.size() == 0) {
+      LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ",
+          pool.getPoolName());
+      this.status = ProgressStatus.Error;
+      return;
+    }
+
+    nodeProcessed = new AtomicInteger(0);
+    containerProcessedCount = new AtomicInteger(0);
+    nodeCount = new AtomicInteger(0);
+    /*
+       Ask each datanode to send us commands.
+     */
+    SendContainerCommand cmd = SendContainerCommand.newBuilder().build();
+    for (DatanodeID id : datanodeIDList) {
+      NodeManager.NODESTATE currentState = getNodestate(id);
+      if (currentState == HEALTHY || currentState == STALE) {
+        nodeCount.incrementAndGet();
+        // Queue commands to all datanodes in this pool to send us container
+        // report. Since we ignore dead nodes, it is possible that we would have
+        // over replicated the container if the node comes back.
+        commandQueue.addCommand(id, cmd);
+      }
+    }
+    this.status = ProgressStatus.InProgress;
+    this.getPool().setLastProcessedTime(Time.monotonicNow());
+  }
+
+  /**
+   * Gets the node state.
+   *
+   * @param id - datanode ID.
+   * @return NodeState.
+   */
+  private NodeManager.NODESTATE getNodestate(DatanodeID id) {
+    NodeManager.NODESTATE currentState = UNKNOWN;
+    int maxTry = 100;
+    // We need to loop to make sure that we will retry if we get
+    // node state unknown. This can lead to infinite loop if we send
+    // in unknown node ID. So max try count is used to prevent it.
+
+    int currentTry = 0;
+    while (currentState == UNKNOWN && currentTry < maxTry) {
+      // Retry to make sure that we deal with the case of node state not
+      // known.
+      currentState = nodeManager.getNodeState(id);
+      currentTry++;
+      if (currentState == UNKNOWN) {
+        // Sleep to make sure that this is not a tight loop.
+        sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+      }
+    }
+    if (currentState == UNKNOWN) {
+      LOG.error("Not able to determine the state of Node: {}, Exceeded max " +
+          "try and node manager returns UNKNOWN state. This indicates we " +
+          "are dealing with a node that we don't know about.", id);
+    }
+    return currentState;
+  }
+
+  /**
+   * Queues a container Report for handling. This is done in a worker thread
+   * since decoding a container report might be compute intensive . We don't
+   * want to block since we have asked for bunch of container reports
+   * from a set of datanodes.
+   *
+   * @param containerReport - ContainerReport
+   */
+  public void handleContainerReport(ContainerReportsProto containerReport) {
+    executorService.submit(processContainerReport(containerReport));
+  }
+
+  private Runnable processContainerReport(ContainerReportsProto reports) {
+    return () -> {
+      DatanodeID datanodeID =
+          DatanodeID.getFromProtoBuf(reports.getDatanodeID());
+      if (processedNodeSet.computeIfAbsent(datanodeID.getDatanodeUuid(),
+          (k) -> true)) {
+        nodeProcessed.incrementAndGet();
+        LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed,
+            datanodeID.getDatanodeUuid());
+        for (ContainerInfo info : reports.getReportsList()) {
+          containerProcessedCount.incrementAndGet();
+          LOG.debug("Total Containers processed: {} Container Name: {}",
+              containerProcessedCount.get(), info.getContainerName());
+
+          // Update the container map with count + 1 if the key exists or
+          // update the map with 1. Since this is a concurrentMap the
+          // computation and update is atomic.
+          containerCountMap.merge(info.getContainerName(), 1, Integer::sum);
+        }
+      }
+    };
+  }
+
+  /**
+   * Filter the containers based on specific rules.
+   *
+   * @param predicate -- Predicate to filter by
+   * @return A list of map entries.
+   */
+  public List<Map.Entry<String, Integer>> filterContainer(
+      Predicate<Map.Entry<String, Integer>> predicate) {
+    return containerCountMap.entrySet().stream()
+        .filter(predicate).collect(Collectors.toList());
+  }
+
+  /**
+   * Used only for testing, calling this will abort container report
+   * processing. This is very dangerous call and should not be made by any users
+   */
+  @VisibleForTesting
+  public void setDoneProcessing() {
+    nodeProcessed.set(nodeCount.get());
+  }
+
+  /**
+   * Returns the pool name.
+   *
+   * @return Name of the pool.
+   */
+  String getPoolName() {
+    return pool.getPoolName();
+  }
+
+  /**
+   * Current status of the computing replication status.
+   */
+  public enum ProgressStatus {
+    InProgress, Done, Error
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/PeriodicPool.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/PeriodicPool.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/PeriodicPool.java
new file mode 100644
index 0000000..35b1e76
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/PeriodicPool.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container.replication;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Periodic pool is a pool with a time stamp, this allows us to process pools
+ * based on a cyclic clock.
+ */
+public class PeriodicPool implements Comparable<PeriodicPool> {
+  private final String poolName;
+  private long lastProcessedTime;
+  private AtomicLong totalProcessedCount;
+
+  /**
+   * Constructs a periodic pool.
+   *
+   * @param poolName - Name of the pool
+   */
+  public PeriodicPool(String poolName) {
+    this.poolName = poolName;
+    lastProcessedTime = 0;
+    totalProcessedCount = new AtomicLong(0);
+  }
+
+  /**
+   * Get pool Name.
+   * @return PoolName
+   */
+  public String getPoolName() {
+    return poolName;
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less
+   * than, equal to, or greater than the specified object.
+   *
+   * @param o the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object is
+   * less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(PeriodicPool o) {
+    return Long.compare(this.lastProcessedTime, o.lastProcessedTime);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    PeriodicPool that = (PeriodicPool) o;
+
+    return poolName.equals(that.poolName);
+  }
+
+  @Override
+  public int hashCode() {
+    return poolName.hashCode();
+  }
+
+  /**
+   * Returns the Total Times we have processed this pool.
+   *
+   * @return processed count.
+   */
+  public long getTotalProcessedCount() {
+    return totalProcessedCount.get();
+  }
+
+  /**
+   * Gets the last time we processed this pool.
+   * @return time in milliseconds
+   */
+  public long getLastProcessedTime() {
+    return this.lastProcessedTime;
+  }
+
+
+  /**
+   * Sets the last processed time.
+   *
+   * @param lastProcessedTime - Long in milliseconds.
+   */
+
+  public void setLastProcessedTime(long lastProcessedTime) {
+    this.lastProcessedTime = lastProcessedTime;
+  }
+
+  /*
+   * Increments the total processed count.
+   */
+  public void incTotalProcessedCount() {
+    this.totalProcessedCount.incrementAndGet();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/package-info.java
new file mode 100644
index 0000000..82e4202
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.container.replication;
+/*
+ This package contains routines that manage replication of a container. This
+ relies on container reports to understand the replication level of a
+ container - UnderReplicated, Replicated, OverReplicated -- and manages the
+ replication level based on that.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
index 5932606..bbf319b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/CommandQueue.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,8 +17,11 @@
  */
 package org.apache.hadoop.ozone.scm.node;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
 
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -35,18 +38,43 @@ import java.util.concurrent.locks.ReentrantLock;
  * there where queued.
  */
 public class CommandQueue {
-
-  private final Map<DatanodeID, List<SCMCommand>> commandMap;
-  private final Lock lock;
-  // This map is used as default return value.
+  // This list is used as default return value.
   private static final List<SCMCommand> DEFAULT_LIST = new LinkedList<>();
+  private final Map<DatanodeID, Commands> commandMap;
+  private final Lock lock;
+  private long commandsInQueue;
+
+  /**
+   * Returns number of commands in queue.
+   * @return Command Count.
+   */
+  public long getCommandsInQueue() {
+    return commandsInQueue;
+  }
 
   /**
    * Constructs a Command Queue.
+   * TODO : Add a flusher thread that throws away commands older than a certain
+   * time period.
    */
   public CommandQueue() {
     commandMap = new HashMap<>();
     lock = new ReentrantLock();
+    commandsInQueue = 0;
+  }
+
+  /**
+   * This function is used only for test purposes.
+   */
+  @VisibleForTesting
+  public void clear() {
+    lock.lock();
+    try {
+      commandMap.clear();
+      commandsInQueue = 0;
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -61,8 +89,15 @@ public class CommandQueue {
   List<SCMCommand> getCommand(final DatanodeID datanodeID) {
     lock.lock();
     try {
-      List<SCMCommand> cmds = commandMap.remove(datanodeID);
-      return cmds == null ? DEFAULT_LIST : cmds;
+      Commands cmds = commandMap.remove(datanodeID);
+      List<SCMCommand> cmdList = null;
+      if(cmds != null) {
+        cmdList = cmds.getCommands();
+        commandsInQueue -= cmdList.size() > 0 ? cmdList.size() : 0;
+        // A post condition really.
+        Preconditions.checkState(commandsInQueue >= 0);
+      }
+      return cmds == null ? DEFAULT_LIST : cmdList;
     } finally {
       lock.unlock();
     }
@@ -74,19 +109,82 @@ public class CommandQueue {
    * @param datanodeID DatanodeID
    * @param command    - Command
    */
-  void addCommand(final DatanodeID datanodeID, final SCMCommand command) {
+  public void addCommand(final DatanodeID datanodeID, final SCMCommand
+      command) {
     lock.lock();
     try {
       if (commandMap.containsKey(datanodeID)) {
         commandMap.get(datanodeID).add(command);
       } else {
-        LinkedList<SCMCommand> newList = new LinkedList<>();
-        newList.add(command);
-        commandMap.put(datanodeID, newList);
+        commandMap.put(datanodeID, new Commands(command));
       }
+      commandsInQueue++;
     } finally {
       lock.unlock();
     }
   }
 
+  /**
+   * Class that stores commands for a datanode.
+   */
+  private static class Commands {
+    private long updateTime;
+    private long readTime;
+    private List<SCMCommand> commands;
+
+    /**
+     * Constructs a Commands class.
+     */
+    Commands() {
+      commands = new LinkedList<>();
+      updateTime = 0;
+      readTime = 0;
+    }
+
+    /**
+     * Creates the object and populates with the command.
+     * @param command command to add to queue.
+     */
+    Commands(SCMCommand command) {
+      this();
+      this.add(command);
+    }
+
+    /**
+     * Gets the last time the commands for this node was updated.
+     * @return Time stamp
+     */
+    public long getUpdateTime() {
+      return updateTime;
+    }
+
+    /**
+     * Gets the last read time.
+     * @return last time when these commands were read from this queue.
+     */
+    public long getReadTime() {
+      return readTime;
+    }
+
+    /**
+     * Adds a command to the list.
+     *
+     * @param command SCMCommand
+     */
+    public void add(SCMCommand command) {
+      this.commands.add(command);
+      updateTime = Time.monotonicNow();
+    }
+
+    /**
+     * Returns the commands for this datanode.
+     * @return command list.
+     */
+    public List<SCMCommand> getCommands() {
+      List<SCMCommand> temp = this.commands;
+      this.commands = new LinkedList<>();
+      readTime = Time.monotonicNow();
+      return temp;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
index d4ca85f..3670f56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
@@ -109,7 +109,8 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   enum NODESTATE {
     HEALTHY,
     STALE,
-    DEAD
+    DEAD,
+    UNKNOWN
   }
 
   /**
@@ -137,4 +138,11 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    */
   @VisibleForTesting
   boolean waitForHeartbeatProcessed();
+
+  /**
+   * Returns the node state of a specific node.
+   * @param id - DatanodeID
+   * @return Healthy/Stale/Dead.
+   */
+  NODESTATE getNodeState(DatanodeID id);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
index 3ab7275..a71fbcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
@@ -376,6 +376,12 @@ public class SCMNodeManager
       return staleNodeCount.get();
     case DEAD:
       return deadNodeCount.get();
+    case UNKNOWN:
+      // This is unknown due to the fact that some nodes can be in
+      // transit between the other states. Returning a count for that is not
+      // possible. The fact that we have such state is to deal with the fact
+      // that this information might not be consistent always.
+      return 0;
     default:
       throw new IllegalArgumentException("Unknown node state requested.");
     }
@@ -393,6 +399,37 @@ public class SCMNodeManager
   }
 
   /**
+   * Returns the node state of a specific node.
+   *
+   * @param id - DatanodeID
+   * @return Healthy/Stale/Dead/Unknown.
+   */
+  @Override
+  public NODESTATE getNodeState(DatanodeID id) {
+    // There is a subtle race condition here, hence we also support
+    // the NODEState.UNKNOWN. It is possible that just before we check the
+    // healthyNodes, we have removed the node from the healthy list but stil
+    // not added it to Stale Nodes list.
+    // We can fix that by adding the node to stale list before we remove, but
+    // then the node is in 2 states to avoid this race condition. Instead we
+    // just deal with the possibilty of getting a state called unknown.
+
+    if(healthyNodes.containsKey(id.getDatanodeUuid())) {
+      return NODESTATE.HEALTHY;
+    }
+
+    if(staleNodes.containsKey(id.getDatanodeUuid())) {
+      return NODESTATE.STALE;
+    }
+
+    if(deadNodes.containsKey(id.getDatanodeUuid())) {
+      return NODESTATE.DEAD;
+    }
+
+    return NODESTATE.UNKNOWN;
+  }
+
+  /**
    * This is the real worker thread that processes the HB queue. We do the
    * following things in this thread.
    * <p>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index 212d9b7..aa52979 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -110,8 +110,9 @@ message ContainerReportsProto {
     fullReport = 0;
     deltaReport = 1;
   }
-  repeated ContainerInfo reports = 1;
-  required reportType type = 2;
+  required DatanodeIDProto datanodeID = 1;
+  repeated ContainerInfo reports = 2;
+  required reportType type = 3;
 }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java
new file mode 100644
index 0000000..d0f440f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationDatanodeStateManager.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * This class  manages the state of datanode
+ * in conjunction with the node pool and node managers.
+ */
+public class ReplicationDatanodeStateManager {
+  private final NodeManager nodeManager;
+  private final NodePoolManager poolManager;
+  private final Random r;
+
+  /**
+   * The datanode state Manager.
+   *
+   * @param nodeManager
+   * @param poolManager
+   */
+  public ReplicationDatanodeStateManager(NodeManager nodeManager,
+      NodePoolManager poolManager) {
+    this.nodeManager = nodeManager;
+    this.poolManager = poolManager;
+    r = new Random();
+  }
+
+  /**
+   * Get Container Report as if it is from a datanode in the cluster.
+   * @param containerName - Container Name.
+   * @param poolName - Pool Name.
+   * @param dataNodeCount - Datanode Count.
+   * @return List of Container Reports.
+   */
+  public List<ContainerReportsProto> getContainerReport(String containerName,
+      String poolName, int dataNodeCount) {
+    List<ContainerReportsProto> containerList = new LinkedList<>();
+    List<DatanodeID> nodesInPool = poolManager.getNodes(poolName);
+
+    if (nodesInPool == null) {
+      return containerList;
+    }
+
+    if (nodesInPool.size() < dataNodeCount) {
+      throw new IllegalStateException("Not enough datanodes to create " +
+          "required container reports");
+    }
+
+    while (containerList.size() < dataNodeCount && nodesInPool.size() > 0) {
+      DatanodeID id = nodesInPool.get(r.nextInt(nodesInPool.size()));
+      nodesInPool.remove(id);
+      // We return container reports only for nodes that are healthy.
+      if (nodeManager.getNodeState(id) == NodeManager.NODESTATE.HEALTHY) {
+        ContainerInfo info = ContainerInfo.newBuilder()
+            .setContainerName(containerName)
+            .setFinalhash(DigestUtils.sha256Hex(containerName))
+            .build();
+        ContainerReportsProto containerReport = ContainerReportsProto
+            .newBuilder().addReports(info)
+            .setDatanodeID(id.getProtoBufMessage())
+            .setType(ContainerReportsProto.reportType.fullReport)
+            .build();
+        containerList.add(containerReport);
+      }
+    }
+    return containerList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
new file mode 100644
index 0000000..e432c26
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodeManagerMock.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.ozone.protocol.VersionResponse;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A Node Manager to test replication.
+ */
+public class ReplicationNodeManagerMock implements NodeManager {
+  private final Map<DatanodeID, NODESTATE> nodeStateMap;
+
+  /**
+   * A list of Datanodes and current states.
+   * @param nodeState A node state map.
+   */
+  public ReplicationNodeManagerMock(Map<DatanodeID, NODESTATE> nodeState) {
+    Preconditions.checkNotNull(nodeState);
+    nodeStateMap = nodeState;
+  }
+
+  /**
+   * Get the minimum number of nodes to get out of chill mode.
+   *
+   * @return int
+   */
+  @Override
+  public int getMinimumChillModeNodes() {
+    return 0;
+  }
+
+  /**
+   * Reports if we have exited out of chill mode by discovering enough nodes.
+   *
+   * @return True if we are out of Node layer chill mode, false otherwise.
+   */
+  @Override
+  public boolean isOutOfNodeChillMode() {
+    return !nodeStateMap.isEmpty();
+  }
+
+  /**
+   * Returns a chill mode status string.
+   *
+   * @return String
+   */
+  @Override
+  public String getChillModeStatus() {
+    return null;
+  }
+
+  /**
+   * Returns the status of manual chill mode flag.
+   *
+   * @return true if forceEnterChillMode has been called, false if
+   * forceExitChillMode or status is not set. eg. clearChillModeFlag.
+   */
+  @Override
+  public boolean isInManualChillMode() {
+    return false;
+  }
+
+  /**
+   * Get the number of data nodes that in all states.
+   *
+   * @return A state to number of nodes that in this state mapping
+   */
+  @Override
+  public Map<String, Integer> getNodeCount() {
+    return null;
+  }
+
+  /**
+   * Removes a data node from the management of this Node Manager.
+   *
+   * @param node - DataNode.
+   * @throws UnregisteredNodeException
+   */
+  @Override
+  public void removeNode(DatanodeID node) throws UnregisteredNodeException {
+    nodeStateMap.remove(node);
+
+  }
+
+  /**
+   * Gets all Live Datanodes that is currently communicating with SCM.
+   *
+   * @param nodestate - State of the node
+   * @return List of Datanodes that are Heartbeating SCM.
+   */
+  @Override
+  public List<DatanodeID> getNodes(NODESTATE nodestate) {
+    return null;
+  }
+
+  /**
+   * Returns the Number of Datanodes that are communicating with SCM.
+   *
+   * @param nodestate - State of the node
+   * @return int -- count
+   */
+  @Override
+  public int getNodeCount(NODESTATE nodestate) {
+    return 0;
+  }
+
+  /**
+   * Get all datanodes known to SCM.
+   *
+   * @return List of DatanodeIDs known to SCM.
+   */
+  @Override
+  public List<DatanodeID> getAllNodes() {
+    return null;
+  }
+
+  /**
+   * Chill mode is the period when node manager waits for a minimum
+   * configured number of datanodes to report in. This is called chill mode
+   * to indicate the period before node manager gets into action.
+   * <p>
+   * Forcefully exits the chill mode, even if we have not met the minimum
+   * criteria of the nodes reporting in.
+   */
+  @Override
+  public void forceExitChillMode() {
+
+  }
+
+  /**
+   * Forcefully enters chill mode, even if all minimum node conditions are met.
+   */
+  @Override
+  public void forceEnterChillMode() {
+
+  }
+
+  /**
+   * Clears the manual chill mode flag.
+   */
+  @Override
+  public void clearChillModeFlag() {
+
+  }
+
+  /**
+   * Returns the aggregated node stats.
+   *
+   * @return the aggregated node stats.
+   */
+  @Override
+  public SCMNodeStat getStats() {
+    return null;
+  }
+
+  /**
+   * Return a map of node stats.
+   *
+   * @return a map of individual node stats (live/stale but not dead).
+   */
+  @Override
+  public Map<String, SCMNodeStat> getNodeStats() {
+    return null;
+  }
+
+  /**
+   * Return the node stat of the specified datanode.
+   *
+   * @param datanodeID - datanode ID.
+   * @return node stat if it is live/stale, null if it is dead or does't exist.
+   */
+  @Override
+  public SCMNodeMetric getNodeStat(DatanodeID datanodeID) {
+    return null;
+  }
+
+  /**
+   * Wait for the heartbeat is processed by NodeManager.
+   *
+   * @return true if heartbeat has been processed.
+   */
+  @Override
+  public boolean waitForHeartbeatProcessed() {
+    return false;
+  }
+
+  /**
+   * Returns the node state of a specific node.
+   *
+   * @param id - DatanodeID
+   * @return Healthy/Stale/Dead.
+   */
+  @Override
+  public NODESTATE getNodeState(DatanodeID id) {
+    return nodeStateMap.get(id);
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   * <p>
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the
+   * close may fail require careful attention. It is strongly advised
+   * to relinquish the underlying resources and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing
+   * the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  /**
+   * When an object implementing interface <code>Runnable</code> is used
+   * to create a thread, starting the thread causes the object's
+   * <code>run</code> method to be called in that separately executing
+   * thread.
+   * <p>
+   * The general contract of the method <code>run</code> is that it may
+   * take any action whatsoever.
+   *
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+
+  }
+
+  /**
+   * Gets the version info from SCM.
+   *
+   * @param versionRequest - version Request.
+   * @return - returns SCM version info and other required information needed by
+   * datanode.
+   */
+  @Override
+  public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
+    return null;
+  }
+
+  /**
+   * Register the node if the node finds that it is not registered with any SCM.
+   *
+   * @param datanodeID - Send datanodeID with Node info, but datanode UUID is
+   * empty. Server returns a datanodeID for the given node.
+   * @return SCMHeartbeatResponseProto
+   */
+  @Override
+  public SCMCommand register(DatanodeID datanodeID) {
+    return null;
+  }
+
+  /**
+   * Send heartbeat to indicate the datanode is alive and doing well.
+   *
+   * @param datanodeID - Datanode ID.
+   * @param nodeReport - node report.
+   * @return SCMheartbeat response list
+   */
+  @Override
+  public List<SCMCommand> sendHeartbeat(DatanodeID datanodeID,
+      SCMNodeReport nodeReport) {
+    return null;
+  }
+
+  /**
+   * Clears all nodes from the node Manager.
+   */
+  public void clearMap() {
+    this.nodeStateMap.clear();
+  }
+
+  /**
+   * Adds a node to the existing Node manager. This is used only for test
+   * purposes.
+   * @param id - DatanodeID
+   * @param state State you want to put that node to.
+   */
+  public void addNode(DatanodeID id, NODESTATE state) {
+    nodeStateMap.put(id, state);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodePoolManagerMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodePoolManagerMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodePoolManagerMock.java
new file mode 100644
index 0000000..a86a54c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/ReplicationNodePoolManagerMock.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Pool Manager replication mock.
+ */
+public class ReplicationNodePoolManagerMock implements NodePoolManager {
+
+  private final Map<DatanodeID, String> nodeMemberShip;
+
+  /**
+   * A node pool manager for testing.
+   */
+  public ReplicationNodePoolManagerMock() {
+    nodeMemberShip = new HashMap<>();
+  }
+
+  /**
+   * Add a node to a node pool.
+   *
+   * @param pool - name of the node pool.
+   * @param node - data node.
+   */
+  @Override
+  public void addNode(String pool, DatanodeID node) {
+    nodeMemberShip.put(node, pool);
+  }
+
+  /**
+   * Remove a node from a node pool.
+   *
+   * @param pool - name of the node pool.
+   * @param node - data node.
+   * @throws SCMException
+   */
+  @Override
+  public void removeNode(String pool, DatanodeID node) throws SCMException {
+    nodeMemberShip.remove(node);
+
+  }
+
+  /**
+   * Get a list of known node pools.
+   *
+   * @return a list of known node pool names or an empty list if not node pool
+   * is defined.
+   */
+  @Override
+  public List<String> getNodePools() {
+    Set<String> poolSet = new HashSet<>();
+    for (Map.Entry<DatanodeID, String> entry : nodeMemberShip.entrySet()) {
+      poolSet.add(entry.getValue());
+    }
+    return new ArrayList<>(poolSet);
+
+  }
+
+  /**
+   * Get all nodes of a node pool given the name of the node pool.
+   *
+   * @param pool - name of the node pool.
+   * @return a list of datanode ids or an empty list if the node pool was not
+   * found.
+   */
+  @Override
+  public List<DatanodeID> getNodes(String pool) {
+    Set<DatanodeID> datanodeSet = new HashSet<>();
+    for (Map.Entry<DatanodeID, String> entry : nodeMemberShip.entrySet()) {
+      if (entry.getValue().equals(pool)) {
+        datanodeSet.add(entry.getKey());
+      }
+    }
+    return new ArrayList<>(datanodeSet);
+  }
+
+  /**
+   * Get the node pool name if the node has been added to a node pool.
+   *
+   * @param datanodeID - datanode ID.
+   * @return node pool name if it has been assigned. null if the node has not
+   * been assigned to any node pool yet.
+   */
+  @Override
+  public String getNodePool(DatanodeID datanodeID) {
+    return nodeMemberShip.get(datanodeID);
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated
+   * with it. If the stream is already closed then invoking this
+   * method has no effect.
+   * <p>
+   * <p> As noted in {@link AutoCloseable#close()}, cases where the
+   * close may fail require careful attention. It is strongly advised
+   * to relinquish the underlying resources and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing
+   * the {@code IOException}.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/package-info.java
new file mode 100644
index 0000000..ff597d5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/TestUtils/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.TestUtils;
+// Helper classes for ozone and container tests.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
index 252ff67..2789275 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/SCMTestUtils.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -138,8 +139,7 @@ public final class SCMTestUtils {
     try (ServerSocket socket = new ServerSocket(0)) {
       socket.setReuseAddress(true);
       int port = socket.getLocalPort();
-      String addr = InetAddress.getLoopbackAddress().getHostAddress()
-          .toString();
+      String addr = InetAddress.getLoopbackAddress().getHostAddress();
       return new InetSocketAddress(addr, port);
     }
   }
@@ -148,6 +148,10 @@ public final class SCMTestUtils {
     return new Configuration();
   }
 
+  public static OzoneConfiguration getOzoneConf() {
+    return new OzoneConfiguration();
+  }
+
   public static DatanodeID getDatanodeID(SCMNodeManager nodeManager) {
 
     return getDatanodeID(nodeManager, UUID.randomUUID().toString());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 3c3f3b7..63ada33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -61,6 +61,8 @@ import java.net.InetSocketAddress;
 import java.util.UUID;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.ozone.container.common.SCMTestUtils
+    .getDatanodeID;
 import static org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ReportState.states
     .noContainerReports;
@@ -193,7 +195,7 @@ public class TestEndPoint {
   public void testRegister() throws Exception {
     String[] scmAddressArray = new String[1];
     scmAddressArray[0] = serverAddress.toString();
-    DatanodeID nodeToRegister = SCMTestUtils.getDatanodeID();
+    DatanodeID nodeToRegister = getDatanodeID();
     try (EndpointStateMachine rpcEndPoint =
              SCMTestUtils.createEndpoint(
                  SCMTestUtils.getConf(), serverAddress, 1000)) {
@@ -218,7 +220,7 @@ public class TestEndPoint {
     if (!clearContainerID) {
       ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
           .setClusterID(UUID.randomUUID().toString())
-          .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
+          .setDatanodeID(getDatanodeID().getProtoBufMessage())
           .build();
       endpointTask.setContainerNodeIDProto(containerNodeID);
     }
@@ -272,7 +274,7 @@ public class TestEndPoint {
 
   @Test
   public void testHeartbeat() throws Exception {
-    DatanodeID dataNode = SCMTestUtils.getDatanodeID();
+    DatanodeID dataNode = getDatanodeID();
     try (EndpointStateMachine rpcEndPoint =
              SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
                  serverAddress, 1000)) {
@@ -299,7 +301,7 @@ public class TestEndPoint {
             scmAddress, rpcTimeout)) {
     ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
         .setClusterID(UUID.randomUUID().toString())
-        .setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
+        .setDatanodeID(getDatanodeID().getProtoBufMessage())
         .build();
     rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
 
@@ -365,6 +367,8 @@ public class TestEndPoint {
       reportsBuilder.addReports(getRandomContainerReport()
           .getProtoBufMessage());
     }
+    reportsBuilder.setDatanodeID(SCMTestUtils.getDatanodeID()
+        .getProtoBufMessage());
     reportsBuilder.setType(StorageContainerDatanodeProtocolProtos
         .ContainerReportsProto.reportType.fullReport);
     return reportsBuilder.build();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 35e358a..3b53e8f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import org.apache.commons.codec.binary.Hex;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -354,7 +355,7 @@ public class TestContainerPersistence {
     pipeline.setContainerName(containerName);
     ContainerData cData = new ContainerData(containerName);
     cData.addMetadata("VOLUME", "shire");
-    cData.addMetadata("owner)", "bilbo");
+    cData.addMetadata("owner", "bilbo");
     if(!containerManager.getContainerMap()
         .containsKey(containerName)) {
       containerManager.createContainer(pipeline, cData);
@@ -773,7 +774,7 @@ public class TestContainerPersistence {
 
   @Test
   public void testListKey() throws Exception {
-    String containerName = "c-0";
+    String containerName = "c0" + RandomStringUtils.randomAscii(10);
     Pipeline pipeline = createSingleNodePipeline(containerName);
     List<String> expectedKeys = new ArrayList<String>();
     for (int i = 0; i < 10; i++) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java
new file mode 100644
index 0000000..dc42a34
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplicationManager.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.container.TestUtils
+    .ReplicationDatanodeStateManager;
+import org.apache.hadoop.ozone.container.TestUtils.ReplicationNodeManagerMock;
+import org.apache.hadoop.ozone.container.TestUtils
+    .ReplicationNodePoolManagerMock;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.scm.container.replication
+    .ContainerReplicationManager;
+import org.apache.hadoop.ozone.scm.container.replication.InProgressPool;
+import org.apache.hadoop.ozone.scm.node.CommandQueue;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.ozone.scm.node.NodePoolManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.scm.ScmConfigKeys
+    .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS;
+import static org.apache.ratis.shaded.com.google.common.util.concurrent
+    .Uninterruptibles.sleepUninterruptibly;
+
+/**
+ * Tests for the container manager.
+ */
+public class TestContainerReplicationManager {
+  final static String POOL_NAME_TEMPLATE = "Pool%d";
+  static final int MAX_DATANODES = 72;
+  static final int POOL_SIZE = 24;
+  static final int POOL_COUNT = 3;
+  private LogCapturer logCapturer = LogCapturer.captureLogs(
+      LogFactory.getLog(ContainerReplicationManager.class));
+  private List<DatanodeID> datanodes = new LinkedList<>();
+  private NodeManager nodeManager;
+  private NodePoolManager poolManager;
+  private CommandQueue commandQueue;
+  private ContainerReplicationManager replicationManager;
+  private ReplicationDatanodeStateManager datanodeStateManager;
+
+  @After
+  public void tearDown() throws Exception {
+    logCapturer.stopCapturing();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    Map<DatanodeID, NodeManager.NODESTATE> nodeStateMap = new HashMap<>();
+    // We are setting up 3 pools with 24 nodes each in this cluster.
+    // First we create 72 Datanodes.
+    for (int x = 0; x < MAX_DATANODES; x++) {
+      DatanodeID datanode = SCMTestUtils.getDatanodeID();
+      datanodes.add(datanode);
+      nodeStateMap.put(datanode, NodeManager.NODESTATE.HEALTHY);
+    }
+
+    // All nodes in this cluster are healthy for time being.
+    nodeManager = new ReplicationNodeManagerMock(nodeStateMap);
+    poolManager = new ReplicationNodePoolManagerMock();
+    commandQueue = new CommandQueue();
+
+    Assert.assertEquals("Max datanodes should be equal to POOL_SIZE * " +
+        "POOL_COUNT", POOL_COUNT * POOL_SIZE, MAX_DATANODES);
+
+    // Start from 1 instead of zero so we can multiply and get the node index.
+    for (int y = 1; y <= POOL_COUNT; y++) {
+      String poolName = String.format(POOL_NAME_TEMPLATE, y);
+      for (int z = 0; z < POOL_SIZE; z++) {
+        DatanodeID id = datanodes.get(y * z);
+        poolManager.addNode(poolName, id);
+      }
+    }
+    OzoneConfiguration config = SCMTestUtils.getOzoneConf();
+    config.setInt(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_SECONDS, 1);
+    replicationManager = new ContainerReplicationManager(config,
+        nodeManager, poolManager, commandQueue);
+    datanodeStateManager = new ReplicationDatanodeStateManager(nodeManager,
+        poolManager);
+    // Sleep for one second to make sure all threads get time to run.
+    sleepUninterruptibly(1, TimeUnit.SECONDS);
+  }
+
+  @Test
+  /**
+   * Asserts that at least one pool is picked up for processing.
+   */
+  public void testAssertPoolsAreProcessed() {
+    // This asserts that replication manager has started processing at least
+    // one pool.
+    Assert.assertTrue(replicationManager.getInProgressPoolCount() > 0);
+
+    // Since all datanodes are flagged as healthy in this test, for each
+    // datanode we must have queued a command.
+    Assert.assertEquals("Commands are in queue :", commandQueue
+        .getCommandsInQueue(), POOL_SIZE * replicationManager
+        .getInProgressPoolCount());
+  }
+
+  @Test
+  /**
+   * This test sends container reports for 2 containers to a pool in progress.
+   * Asserts that we are able to find a container with single replica and do
+   * not find container with 3 replicas.
+   */
+  public void testDetectSingleContainerReplica() throws TimeoutException,
+      InterruptedException {
+    String singleNodeContainer = "SingleNodeContainer";
+    String threeNodeContainer = "ThreeNodeContainer";
+    InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
+    // Only single datanode reporting that "SingleNodeContainer" exists.
+    List<ContainerReportsProto> clist =
+        datanodeStateManager.getContainerReport(singleNodeContainer,
+            ppool.getPool().getPoolName(), 1);
+    ppool.handleContainerReport(clist.get(0));
+
+    // Three nodes are going to report that ThreeNodeContainer  exists.
+    clist = datanodeStateManager.getContainerReport(threeNodeContainer,
+        ppool.getPool().getPoolName(), 3);
+
+    for (ContainerReportsProto reportsProto : clist) {
+      ppool.handleContainerReport(reportsProto);
+    }
+    GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() == 4,
+        200, 1000);
+    ppool.setDoneProcessing();
+
+    List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p
+        .getValue() == 1);
+    Assert.assertEquals(singleNodeContainer, containers.get(0).getKey());
+    int count = containers.get(0).getValue();
+    Assert.assertEquals(count, 1L);
+  }
+
+  @Test
+  /**
+   * We create three containers, Normal,OveReplicated and WayOverReplicated
+   * containers. This test asserts that we are able to find the
+   * over replicated containers.
+   */
+  public void testDetectOverReplica() throws TimeoutException,
+      InterruptedException {
+    String normalContainer = "NormalContainer";
+    String overReplicated = "OverReplicatedContainer";
+    String wayOverReplicated = "WayOverReplicated";
+    InProgressPool ppool = replicationManager.getInProcessPoolList().get(0);
+
+    List<ContainerReportsProto> clist =
+        datanodeStateManager.getContainerReport(normalContainer,
+            ppool.getPool().getPoolName(), 3);
+    ppool.handleContainerReport(clist.get(0));
+
+    clist = datanodeStateManager.getContainerReport(overReplicated,
+        ppool.getPool().getPoolName(), 4);
+
+    for (ContainerReportsProto reportsProto : clist) {
+      ppool.handleContainerReport(reportsProto);
+    }
+
+    clist = datanodeStateManager.getContainerReport(wayOverReplicated,
+        ppool.getPool().getPoolName(), 7);
+
+    for (ContainerReportsProto reportsProto : clist) {
+      ppool.handleContainerReport(reportsProto);
+    }
+
+    // We ignore container reports from the same datanodes.
+    // it is possible that these each of these containers get placed
+    // on same datanodes, so allowing for 4 duplicates in the set of 14.
+    GenericTestUtils.waitFor(() -> ppool.getContainerProcessedCount() > 10,
+        200, 1000);
+    ppool.setDoneProcessing();
+
+    List<Map.Entry<String, Integer>> containers = ppool.filterContainer(p -> p
+        .getValue() > 3);
+    Assert.assertEquals(2, containers.size());
+  }
+
+  @Test
+  /**
+   * This test verifies that all pools are picked up for replica processing.
+   *
+   */
+  public void testAllPoolsAreProcessed() throws TimeoutException,
+      InterruptedException {
+    // Verify that we saw all three pools being picked up for processing.
+    GenericTestUtils.waitFor(() -> replicationManager.getPoolProcessCount()
+        >= 3, 200, 15 * 1000);
+    Assert.assertTrue(logCapturer.getOutput().contains("Pool1") &&
+        logCapturer.getOutput().contains("Pool2") &&
+        logCapturer.getOutput().contains("Pool3"));
+  }
+
+  @Test
+  /**
+   * Adds a new pool and tests that we are able to pick up that new pool for
+   * processing as well as handle container reports for datanodes in that pool.
+   * @throws TimeoutException
+   * @throws InterruptedException
+   */
+  public void testAddingNewPoolWorks() throws TimeoutException,
+      InterruptedException {
+    LogCapturer inProgressLog = LogCapturer.captureLogs(
+        LogFactory.getLog(InProgressPool.class));
+    GenericTestUtils.setLogLevel(InProgressPool.LOG, Level.ALL);
+    try {
+      DatanodeID id = SCMTestUtils.getDatanodeID();
+      ((ReplicationNodeManagerMock) (nodeManager)).addNode(id, NodeManager
+          .NODESTATE.HEALTHY);
+      poolManager.addNode("PoolNew", id);
+      GenericTestUtils.waitFor(() ->
+              logCapturer.getOutput().contains("PoolNew"),
+          200, 15 * 1000);
+
+      // Assert that we are able to send a container report to this new
+      // pool and datanode.
+      List<ContainerReportsProto> clist =
+          datanodeStateManager.getContainerReport("NewContainer1",
+              "PoolNew", 1);
+      replicationManager.handleContainerReport(clist.get(0));
+      GenericTestUtils.waitFor(() ->
+          inProgressLog.getOutput().contains("NewContainer1") && inProgressLog
+              .getOutput().contains(id.getDatanodeUuid()), 200, 10 * 1000);
+    } finally {
+      inProgressLog.stopCapturing();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000..318c54d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+// Test classes for replication.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
index 16f2e3c..c0bebf2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
@@ -39,6 +39,7 @@ import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.Map;
 import java.util.Iterator;
+import java.util.concurrent.TimeoutException;
 
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.TabularData;
@@ -53,12 +54,14 @@ public class TestSCMMXBean {
   private static MBeanServer mbs;
 
   @BeforeClass
-  public static void init() throws IOException {
+  public static void init() throws IOException, TimeoutException,
+      InterruptedException {
     conf = new OzoneConfiguration();
     cluster = new MiniOzoneCluster.Builder(conf)
         .numDataNodes(numOfDatanodes)
         .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
         .build();
+    cluster.waitOzoneReady();
     scm = cluster.getStorageContainerManager();
     mbs = ManagementFactory.getPlatformMBeanServer();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d37ef30/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
index 638de9e..e999ca2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/container/MockNodeManager.java
@@ -252,6 +252,17 @@ public class MockNodeManager implements NodeManager {
   }
 
   /**
+   * Returns the node state of a specific node.
+   *
+   * @param id - DatanodeID
+   * @return Healthy/Stale/Dead.
+   */
+  @Override
+  public NODESTATE getNodeState(DatanodeID id) {
+    return null;
+  }
+
+  /**
    * Closes this stream and releases any system resources associated with it. If
    * the stream is already closed then invoking this method has no effect.
    * <p>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org