You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2018/07/25 01:32:15 UTC

[32/50] hadoop git commit: HDDS-199. Implement ReplicationManager to handle underreplication of closed containers. Contributed by Elek Marton.

HDDS-199. Implement ReplicationManager to handle underreplication of closed containers. Contributed by Elek Marton.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a9e25ed
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a9e25ed
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a9e25ed

Branch: refs/heads/HADOOP-15461
Commit: 3a9e25edf53187f16ec9f9f6075e850b74b3b91f
Parents: 84d7bf1
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Mon Jul 23 10:13:53 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Mon Jul 23 10:28:33 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdds/scm/ScmConfigKeys.java   |   7 +
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |   1 +
 .../common/src/main/resources/ozone-default.xml |  10 +
 .../container/replication/ReplicationQueue.java |  76 ------
 .../replication/ReplicationRequest.java         | 106 --------
 .../container/replication/package-info.java     |  23 --
 .../replication/TestReplicationQueue.java       | 134 ----------
 .../container/replication/package-info.java     |  23 --
 .../hadoop/hdds/server/events/EventWatcher.java |   4 +-
 .../hadoop/hdds/server/events/TypedEvent.java   |   5 +
 .../hdds/server/events/TestEventWatcher.java    |   6 +-
 .../algorithms/ContainerPlacementPolicy.java    |   5 +-
 .../placement/algorithms/SCMCommonPolicy.java   |   8 +-
 .../SCMContainerPlacementCapacity.java          |  16 +-
 .../algorithms/SCMContainerPlacementRandom.java |   7 +-
 .../replication/ReplicationCommandWatcher.java  |  56 +++++
 .../replication/ReplicationManager.java         | 242 +++++++++++++++++++
 .../container/replication/ReplicationQueue.java |  73 ++++++
 .../replication/ReplicationRequest.java         | 107 ++++++++
 .../scm/container/replication/package-info.java |  23 ++
 .../hadoop/hdds/scm/events/SCMEvents.java       |  31 +++
 .../scm/server/StorageContainerManager.java     |  42 +++-
 .../TestSCMContainerPlacementCapacity.java      | 106 ++++++++
 .../TestSCMContainerPlacementRandom.java        |  86 +++++++
 .../replication/TestReplicationManager.java     | 215 ++++++++++++++++
 .../replication/TestReplicationQueue.java       | 134 ++++++++++
 .../scm/container/replication/package-info.java |  23 ++
 .../placement/TestContainerPlacement.java       |   5 +-
 28 files changed, 1192 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 6e940ad..e337d2f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -251,6 +251,13 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_CONTAINER_CLOSE_THRESHOLD =
       "ozone.scm.container.close.threshold";
   public static final float OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
+
+  public static final String HDDS_SCM_WATCHER_TIMEOUT =
+      "hdds.scm.watcher.timeout";
+
+  public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT =
+      "10m";
+
   /**
    * Never constructed.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0273677..92f0c41 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+
 import org.apache.ratis.util.TimeDuration;
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 84a3e0c..6ddf3c6 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1108,4 +1108,14 @@
     </description>
   </property>
 
+  <property>
+    <name>hdds.scm.watcher.timeout</name>
+    <value>10m</value>
+    <tag>OZONE, SCM, MANAGEMENT</tag>
+    <description>
+      Timeout for the watchers of the HDDS SCM CommandWatchers. After this
+      duration the Copy/Delete container commands will be sent again to the
+      datanode unless the datanode confirms the completion.
+    </description>
+  </property>
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
deleted file mode 100644
index e0a2351..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
-/**
- * Priority queue to handle under-replicated and over replicated containers
- * in ozone. ReplicationManager will consume these messages and decide
- * accordingly.
- */
-public class ReplicationQueue {
-
-  private final Queue<ReplicationRequest> queue;
-
-  ReplicationQueue() {
-    queue = new PriorityQueue<>();
-  }
-
-  public synchronized boolean add(ReplicationRequest repObj) {
-    if (this.queue.contains(repObj)) {
-      // Remove the earlier message and insert this one
-      this.queue.remove(repObj);
-    }
-    return this.queue.add(repObj);
-  }
-
-  public synchronized boolean remove(ReplicationRequest repObj) {
-    return queue.remove(repObj);
-  }
-
-  /**
-   * Retrieves, but does not remove, the head of this queue,
-   * or returns {@code null} if this queue is empty.
-   *
-   * @return the head of this queue, or {@code null} if this queue is empty
-   */
-  public synchronized ReplicationRequest peek() {
-    return queue.peek();
-  }
-
-  /**
-   * Retrieves and removes the head of this queue,
-   * or returns {@code null} if this queue is empty.
-   *
-   * @return the head of this queue, or {@code null} if this queue is empty
-   */
-  public synchronized ReplicationRequest poll() {
-    return queue.poll();
-  }
-
-  public synchronized boolean removeAll(List<ReplicationRequest> repObjs) {
-    return queue.removeAll(repObjs);
-  }
-
-  public int size() {
-    return queue.size();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
deleted file mode 100644
index a6ccce1..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.Serializable;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-/**
- * Wrapper class for hdds replication queue. Implements its natural
- * ordering for priority queue.
- */
-public class ReplicationRequest implements Comparable<ReplicationRequest>,
-    Serializable {
-  private final long containerId;
-  private final short replicationCount;
-  private final short expecReplicationCount;
-  private final long timestamp;
-
-  public ReplicationRequest(long containerId, short replicationCount,
-      long timestamp, short expecReplicationCount) {
-    this.containerId = containerId;
-    this.replicationCount = replicationCount;
-    this.timestamp = timestamp;
-    this.expecReplicationCount = expecReplicationCount;
-  }
-
-  /**
-   * Compares this object with the specified object for order.  Returns a
-   * negative integer, zero, or a positive integer as this object is less
-   * than, equal to, or greater than the specified object.
-   * @param o the object to be compared.
-   * @return a negative integer, zero, or a positive integer as this object
-   * is less than, equal to, or greater than the specified object.
-   * @throws NullPointerException if the specified object is null
-   * @throws ClassCastException   if the specified object's type prevents it
-   *                              from being compared to this object.
-   */
-  @Override
-  public int compareTo(ReplicationRequest o) {
-    if (o == null) {
-      return 1;
-    }
-    if (this == o) {
-      return 0;
-    }
-    int retVal = Integer
-        .compare(getReplicationCount() - getExpecReplicationCount(),
-            o.getReplicationCount() - o.getExpecReplicationCount());
-    if (retVal != 0) {
-      return retVal;
-    }
-    return Long.compare(getTimestamp(), o.getTimestamp());
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder(91, 1011)
-        .append(getContainerId())
-        .toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-    ReplicationRequest that = (ReplicationRequest) o;
-    return new EqualsBuilder().append(getContainerId(), that.getContainerId())
-        .isEquals();
-  }
-
-  public long getContainerId() {
-    return containerId;
-  }
-
-  public short getReplicationCount() {
-    return replicationCount;
-  }
-
-  public long getTimestamp() {
-    return timestamp;
-  }
-
-  public short getExpecReplicationCount() {
-    return expecReplicationCount;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
deleted file mode 100644
index 7f335e3..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.replication;
-
-/**
- * Ozone Container replicaton related classes.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
deleted file mode 100644
index 6d74c68..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.util.Random;
-import java.util.UUID;
-import org.apache.hadoop.util.Time;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test class for ReplicationQueue.
- */
-public class TestReplicationQueue {
-
-  private ReplicationQueue replicationQueue;
-  private Random random;
-
-  @Before
-  public void setUp() {
-    replicationQueue = new ReplicationQueue();
-    random = new Random();
-  }
-
-  @Test
-  public void testDuplicateAddOp() {
-    long contId = random.nextLong();
-    String nodeId = UUID.randomUUID().toString();
-    ReplicationRequest obj1, obj2, obj3;
-    long time = Time.monotonicNow();
-    obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
-    obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
-    obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
-
-    replicationQueue.add(obj1);
-    replicationQueue.add(obj2);
-    replicationQueue.add(obj3);
-    Assert.assertEquals("Should add only 1 msg as second one is duplicate",
-        1, replicationQueue.size());
-    ReplicationRequest temp = replicationQueue.poll();
-    Assert.assertEquals(temp, obj3);
-  }
-
-  @Test
-  public void testPollOp() {
-    long contId = random.nextLong();
-    String nodeId = UUID.randomUUID().toString();
-    ReplicationRequest msg1, msg2, msg3, msg4, msg5;
-    msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
-        (short) 3);
-    long time = Time.monotonicNow();
-    msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
-    msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
-    msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
-    // Replication message for same container but different nodeId
-    msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
-
-    replicationQueue.add(msg1);
-    replicationQueue.add(msg2);
-    replicationQueue.add(msg3);
-    replicationQueue.add(msg4);
-    replicationQueue.add(msg5);
-    Assert.assertEquals("Should have 3 objects",
-        3, replicationQueue.size());
-
-    // Since Priority queue orders messages according to replication count,
-    // message with lowest replication should be first
-    ReplicationRequest temp;
-    temp = replicationQueue.poll();
-    Assert.assertEquals("Should have 2 objects",
-        2, replicationQueue.size());
-    Assert.assertEquals(temp, msg3);
-
-    temp = replicationQueue.poll();
-    Assert.assertEquals("Should have 1 objects",
-        1, replicationQueue.size());
-    Assert.assertEquals(temp, msg5);
-
-    // Message 2 should be ordered before message 5 as both have same replication
-    // number but message 2 has earlier timestamp.
-    temp = replicationQueue.poll();
-    Assert.assertEquals("Should have 0 objects",
-        replicationQueue.size(), 0);
-    Assert.assertEquals(temp, msg4);
-  }
-
-  @Test
-  public void testRemoveOp() {
-    long contId = random.nextLong();
-    String nodeId = UUID.randomUUID().toString();
-    ReplicationRequest obj1, obj2, obj3;
-    obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
-        (short) 3);
-    obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
-        (short) 3);
-    obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
-        (short) 3);
-
-    replicationQueue.add(obj1);
-    replicationQueue.add(obj2);
-    replicationQueue.add(obj3);
-    Assert.assertEquals("Should have 3 objects",
-        3, replicationQueue.size());
-
-    replicationQueue.remove(obj3);
-    Assert.assertEquals("Should have 2 objects",
-        2, replicationQueue.size());
-
-    replicationQueue.remove(obj2);
-    Assert.assertEquals("Should have 1 objects",
-        1, replicationQueue.size());
-
-    replicationQueue.remove(obj1);
-    Assert.assertEquals("Should have 0 objects",
-        0, replicationQueue.size());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
deleted file mode 100644
index 5b1fd0f..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-/**
- * SCM Testing and Mocking Utils.
- */
-package org.apache.hadoop.ozone.container.replication;
-// Test classes for Replication functionality.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
index 473c152..38386d4 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
@@ -180,9 +180,9 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
 
   }
 
-  abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+  protected abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
 
-  abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+  protected abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
 
   public List<TIMEOUT_PAYLOAD> getTimeoutEvents(
       Predicate<? super TIMEOUT_PAYLOAD> predicate) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
index c2159ad..62e2419 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
@@ -48,4 +48,9 @@ public class TypedEvent<T> implements Event<T> {
     return name;
   }
 
+  @Override
+  public String toString() {
+    return "TypedEvent{" + "payloadType=" + payloadType + ", name='" + name
+        + '\'' + '}';
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
index 8f18478..786b7b8 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
@@ -216,12 +216,12 @@ public class TestEventWatcher {
     }
 
     @Override
-    void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
+    protected void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
       publisher.fireEvent(UNDER_REPLICATED, payload);
     }
 
     @Override
-    void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
+    protected void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
       //Good job. We did it.
     }
 
@@ -231,8 +231,6 @@ public class TestEventWatcher {
     }
   }
 
-  ;
-
   private static class ReplicationCompletedEvent
       implements IdentifiableEventPayload {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
index 5d91ac5..3336c8e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
@@ -31,11 +31,14 @@ public interface ContainerPlacementPolicy {
   /**
    * Given the replication factor and size required, return set of datanodes
    * that satisfy the nodes and size requirement.
+   *
+   * @param excludedNodes - list of nodes to be excluded.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return list of datanodes chosen.
    * @throws IOException
    */
-  List<DatanodeDetails> chooseDatanodes(int nodesRequired, long sizeRequired)
+  List<DatanodeDetails> chooseDatanodes(List<DatanodeDetails> excludedNodes,
+      int nodesRequired, long sizeRequired)
       throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
index 0a595d5..ba241dc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
@@ -95,16 +95,20 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
    * 3. if a set of containers are requested, we either meet the required
    * number of nodes or we fail that request.
    *
+   *
+   * @param excludedNodes - datanodes with existing replicas
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return list of datanodes chosen.
    * @throws SCMException SCM exception.
    */
 
-  public List<DatanodeDetails> chooseDatanodes(int nodesRequired, final long
-      sizeRequired) throws SCMException {
+  public List<DatanodeDetails> chooseDatanodes(
+      List<DatanodeDetails> excludedNodes,
+      int nodesRequired, final long sizeRequired) throws SCMException {
     List<DatanodeDetails> healthyNodes =
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    healthyNodes.removeAll(excludedNodes);
     String msg;
     if (healthyNodes.size() == 0) {
       msg = "No healthy node found to allocate container.";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
index 85a6b54..8df8f6e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
@@ -17,17 +17,18 @@
 
 package org.apache.hadoop.hdds.scm.container.placement.algorithms;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.List;
-
 /**
  * Container placement policy that randomly choose datanodes with remaining
  * space to satisfy the size constraints.
@@ -83,6 +84,8 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
   /**
    * Called by SCM to choose datanodes.
    *
+   *
+   * @param excludedNodes - list of the datanodes to exclude.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return List of datanodes.
@@ -90,9 +93,10 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
    */
   @Override
   public List<DatanodeDetails> chooseDatanodes(
-      final int nodesRequired, final long sizeRequired) throws SCMException {
+      List<DatanodeDetails> excludedNodes, final int nodesRequired,
+      final long sizeRequired) throws SCMException {
     List<DatanodeDetails> healthyNodes =
-        super.chooseDatanodes(nodesRequired, sizeRequired);
+        super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
     if (healthyNodes.size() == nodesRequired) {
       return healthyNodes;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
index 9903c84..76702d5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
@@ -56,6 +56,8 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
   /**
    * Choose datanodes called by the SCM to choose the datanode.
    *
+   *
+   * @param excludedNodes - list of the datanodes to exclude.
    * @param nodesRequired - number of datanodes required.
    * @param sizeRequired - size required for the container or block.
    * @return List of Datanodes.
@@ -63,9 +65,10 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
    */
   @Override
   public List<DatanodeDetails> chooseDatanodes(
-      final int nodesRequired, final long sizeRequired) throws SCMException {
+      List<DatanodeDetails> excludedNodes, final int nodesRequired,
+      final long sizeRequired) throws SCMException {
     List<DatanodeDetails> healthyNodes =
-        super.chooseDatanodes(nodesRequired, sizeRequired);
+        super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
 
     if (healthyNodes.size() == nodesRequired) {
       return healthyNodes;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
new file mode 100644
index 0000000..03a81a7
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .ReplicationCompleted;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+
+/**
+ * Command watcher to track the replication commands.
+ */
+public class ReplicationCommandWatcher
+    extends
+    EventWatcher<ReplicationManager.ReplicationRequestToRepeat,
+        ReplicationManager.ReplicationCompleted> {
+
+  public ReplicationCommandWatcher(Event<ReplicationRequestToRepeat> startEvent,
+      Event<ReplicationCompleted> completionEvent,
+      LeaseManager<Long> leaseManager) {
+    super(startEvent, completionEvent, leaseManager);
+  }
+
+  @Override
+  protected void onTimeout(EventPublisher publisher,
+      ReplicationRequestToRepeat payload) {
+    //put back to the original queue
+    publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+        payload.getRequest());
+  }
+
+  @Override
+  protected void onFinished(EventPublisher publisher,
+      ReplicationRequestToRepeat payload) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
new file mode 100644
index 0000000..5f78722
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+    .TRACK_REPLICATE_COMMAND;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Replication Manager manages the replication of the closed container.
+ */
+public class ReplicationManager implements Runnable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ReplicationManager.class);
+
+  private ReplicationQueue replicationQueue;
+
+  private ContainerPlacementPolicy containerPlacement;
+
+  private EventPublisher eventPublisher;
+
+  private ReplicationCommandWatcher replicationCommandWatcher;
+
+  private boolean running = true;
+
+  private ContainerStateManager containerStateManager;
+
+  public ReplicationManager(ContainerPlacementPolicy containerPlacement,
+      ContainerStateManager containerStateManager, EventQueue eventQueue,
+      LeaseManager<Long> commandWatcherLeaseManager) {
+
+    this.containerPlacement = containerPlacement;
+    this.containerStateManager = containerStateManager;
+    this.eventPublisher = eventQueue;
+
+    this.replicationCommandWatcher =
+        new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND,
+            SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager);
+
+    this.replicationQueue = new ReplicationQueue();
+
+    eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER,
+        (replicationRequest, publisher) -> replicationQueue
+            .add(replicationRequest));
+
+    this.replicationCommandWatcher.start(eventQueue);
+
+  }
+
+  public void start() {
+
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+        .setNameFormat("Replication Manager").build();
+
+    threadFactory.newThread(this).start();
+  }
+
+  public void run() {
+
+    while (running) {
+      ReplicationRequest request = null;
+      try {
+        //TODO: add throttling here
+        request = replicationQueue.take();
+
+        ContainerID containerID = new ContainerID(request.getContainerId());
+        ContainerInfo containerInfo =
+            containerStateManager.getContainer(containerID);
+
+        Preconditions.checkNotNull(containerInfo,
+            "No information about the container " + request.getContainerId());
+
+        Preconditions
+            .checkState(containerInfo.getState() == LifeCycleState.CLOSED,
+                "Container should be in closed state");
+
+        //check the current replication
+        List<DatanodeDetails> datanodesWithReplicas =
+            getCurrentReplicas(request);
+
+        ReplicationRequest finalRequest = request;
+
+        int inFlightReplications = replicationCommandWatcher.getTimeoutEvents(
+            e -> e.request.getContainerId() == finalRequest.getContainerId())
+            .size();
+
+        int deficit =
+            request.getExpecReplicationCount() - datanodesWithReplicas.size()
+                - inFlightReplications;
+
+        if (deficit > 0) {
+
+          List<DatanodeDetails> selectedDatanodes = containerPlacement
+              .chooseDatanodes(datanodesWithReplicas, deficit,
+                  containerInfo.getUsedBytes());
+
+          //send the command
+          for (DatanodeDetails datanode : selectedDatanodes) {
+
+            ReplicateContainerCommand replicateCommand =
+                new ReplicateContainerCommand(containerID.getId(),
+                    datanodesWithReplicas);
+
+            eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+                new CommandForDatanode<>(
+                    datanode.getUuid(), replicateCommand));
+
+            ReplicationRequestToRepeat timeoutEvent =
+                new ReplicationRequestToRepeat(replicateCommand.getId(),
+                    request);
+
+            eventPublisher.fireEvent(TRACK_REPLICATE_COMMAND, timeoutEvent);
+
+          }
+
+        } else if (deficit < 0) {
+          //TODO: too many replicas. Not handled yet.
+        }
+
+      } catch (Exception e) {
+        LOG.error("Can't replicate container {}", request, e);
+      }
+    }
+
+  }
+
+  @VisibleForTesting
+  protected List<DatanodeDetails> getCurrentReplicas(ReplicationRequest request)
+      throws IOException {
+    //TODO: replication information is not yet available after HDDS-175,
+    // should be fixed after HDDS-228
+    return new ArrayList<>();
+  }
+
+  @VisibleForTesting
+  public ReplicationQueue getReplicationQueue() {
+    return replicationQueue;
+  }
+
+  public void stop() {
+    running = false;
+  }
+
+  /**
+   * Event for the ReplicationCommandWatcher to repeate the embedded request
+   * in case fof timeout.
+   */
+  public static class ReplicationRequestToRepeat
+      implements IdentifiableEventPayload {
+
+    private final long commandId;
+
+    private final ReplicationRequest request;
+
+    public ReplicationRequestToRepeat(long commandId,
+        ReplicationRequest request) {
+      this.commandId = commandId;
+      this.request = request;
+    }
+
+    public ReplicationRequest getRequest() {
+      return request;
+    }
+
+    @Override
+    public long getId() {
+      return commandId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      ReplicationRequestToRepeat that = (ReplicationRequestToRepeat) o;
+      return Objects.equals(request, that.request);
+    }
+
+    @Override
+    public int hashCode() {
+
+      return Objects.hash(request);
+    }
+  }
+
+  public static class ReplicationCompleted implements IdentifiableEventPayload {
+
+    private final long uuid;
+
+    public ReplicationCompleted(long uuid) {
+      this.uuid = uuid;
+    }
+
+    @Override
+    public long getId() {
+      return uuid;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
new file mode 100644
index 0000000..4ca67be
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Priority queue to handle under-replicated and over replicated containers
+ * in ozone. ReplicationManager will consume these messages and decide
+ * accordingly.
+ */
+public class ReplicationQueue {
+
+  private final BlockingQueue<ReplicationRequest> queue;
+
+  public ReplicationQueue() {
+    queue = new PriorityBlockingQueue<>();
+  }
+
+  public boolean add(ReplicationRequest repObj) {
+    if (this.queue.contains(repObj)) {
+      // Remove the earlier message and insert this one
+      this.queue.remove(repObj);
+    }
+    return this.queue.add(repObj);
+  }
+
+  public boolean remove(ReplicationRequest repObj) {
+    return queue.remove(repObj);
+  }
+
+  /**
+   * Retrieves, but does not remove, the head of this queue,
+   * or returns {@code null} if this queue is empty.
+   *
+   * @return the head of this queue, or {@code null} if this queue is empty
+   */
+  public ReplicationRequest peek() {
+    return queue.peek();
+  }
+
+  /**
+   * Retrieves and removes the head of this queue (blocking queue).
+   */
+  public ReplicationRequest take() throws InterruptedException {
+    return queue.take();
+  }
+
+  public boolean removeAll(List<ReplicationRequest> repObjs) {
+    return queue.removeAll(repObjs);
+  }
+
+  public int size() {
+    return queue.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
new file mode 100644
index 0000000..ef7c546
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * Wrapper class for hdds replication queue. Implements its natural
+ * ordering for priority queue.
+ */
+public class ReplicationRequest implements Comparable<ReplicationRequest>,
+    Serializable {
+  private final long containerId;
+  private final short replicationCount;
+  private final short expecReplicationCount;
+  private final long timestamp;
+
+  public ReplicationRequest(long containerId, short replicationCount,
+      long timestamp, short expecReplicationCount) {
+    this.containerId = containerId;
+    this.replicationCount = replicationCount;
+    this.timestamp = timestamp;
+    this.expecReplicationCount = expecReplicationCount;
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less
+   * than, equal to, or greater than the specified object.
+   * @param o the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object
+   * is less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(ReplicationRequest o) {
+    if (o == null) {
+      return 1;
+    }
+    if (this == o) {
+      return 0;
+    }
+    int retVal = Integer
+        .compare(getReplicationCount() - getExpecReplicationCount(),
+            o.getReplicationCount() - o.getExpecReplicationCount());
+    if (retVal != 0) {
+      return retVal;
+    }
+    return Long.compare(getTimestamp(), o.getTimestamp());
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(91, 1011)
+        .append(getContainerId())
+        .toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ReplicationRequest that = (ReplicationRequest) o;
+    return new EqualsBuilder().append(getContainerId(), that.getContainerId())
+        .isEquals();
+  }
+
+  public long getContainerId() {
+    return containerId;
+  }
+
+  public short getReplicationCount() {
+    return replicationCount;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public short getExpecReplicationCount() {
+    return expecReplicationCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
new file mode 100644
index 0000000..934b01e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+/**
+ * HDDS (Closed) Container replicaton related classes.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 46f1588..ad1702b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -28,6 +28,10 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .NodeReportFromDatanode;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .ReplicationCompleted;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
 
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
@@ -129,6 +133,33 @@ public final class SCMEvents {
           "DeleteBlockCommandStatus");
 
   /**
+   * This is the command for ReplicationManager to handle under/over
+   * replication. Sent by the ContainerReportHandler after processing the
+   * heartbeat.
+   */
+  public static final TypedEvent<ReplicationRequest> REPLICATE_CONTAINER =
+      new TypedEvent<>(ReplicationRequest.class);
+
+  /**
+   * This event is sent by the ReplicaManager to the
+   * ReplicationCommandWatcher to track the in-progress replication.
+   */
+  public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat>
+      TRACK_REPLICATE_COMMAND =
+      new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class);
+  /**
+   * This event comes from the Heartbeat dispatcher (in fact from the
+   * datanode) to notify the scm that the replication is done. This is
+   * received by the replicate command watcher to mark in-progress task as
+   * finished.
+    <p>
+   * TODO: Temporary event, should be replaced by specific Heartbeat
+   * ActionRequred event.
+   */
+  public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE =
+      new TypedEvent<>(ReplicationCompleted.class);
+
+  /**
    * Private Ctor. Never Constructed.
    */
   private SCMEvents() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index aba6410..f4cd448 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
 import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
@@ -38,7 +39,12 @@ import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -61,9 +67,13 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.common.StorageInfo;
+import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.StringUtils;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -153,6 +163,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
    * Key = DatanodeUuid, value = ContainerStat.
    */
   private Cache<String, ContainerStat> containerReportCache;
+  private final ReplicationManager replicationManager;
+  private final LeaseManager<Long> commandWatcherLeaseManager;
 
   /**
    * Creates a new StorageContainerManager. Configuration will be updated
@@ -207,6 +219,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
     eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
 
+    long watcherTimeout =
+        conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
+            HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+
+    commandWatcherLeaseManager = new LeaseManager<>(watcherTimeout);
+
+    //TODO: support configurable containerPlacement policy
+    ContainerPlacementPolicy containerPlacementPolicy =
+        new SCMContainerPlacementCapacity(scmNodeManager, conf);
+
+    replicationManager = new ReplicationManager(containerPlacementPolicy,
+        scmContainerManager.getStateManager(), eventQueue,
+        commandWatcherLeaseManager);
+
     scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
         .OZONE_ADMINISTRATORS);
     scmUsername = UserGroupInformation.getCurrentUser().getUserName();
@@ -552,7 +578,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
     httpServer.start();
     scmBlockManager.start();
-
+    replicationManager.start();
     setStartTime();
   }
 
@@ -562,6 +588,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
   public void stop() {
 
     try {
+      LOG.info("Stopping Replication Manager Service.");
+      replicationManager.stop();
+    } catch (Exception ex) {
+      LOG.error("Replication manager service stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping Lease Manager of the command watchers");
+      commandWatcherLeaseManager.shutdown();
+    } catch (Exception ex) {
+      LOG.error("Lease Manager of the command watchers stop failed");
+    }
+
+    try {
       LOG.info("Stopping datanode service RPC server");
       getDatanodeProtocolServer().stop();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
new file mode 100644
index 0000000..5966f2a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import org.junit.Assert;
+import org.junit.Test;
+import static org.mockito.Matchers.anyObject;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+public class TestSCMContainerPlacementCapacity {
+  @Test
+  public void chooseDatanodes() throws SCMException {
+    //given
+    Configuration conf = new OzoneConfiguration();
+
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < 7; i++) {
+      datanodes.add(TestUtils.getDatanodeDetails());
+    }
+
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    when(mockNodeManager.getNodes(NodeState.HEALTHY))
+        .thenReturn(new ArrayList<>(datanodes));
+
+    when(mockNodeManager.getNodeStat(anyObject()))
+        .thenReturn(new SCMNodeMetric(100l, 0L, 100L));
+    when(mockNodeManager.getNodeStat(datanodes.get(2)))
+        .thenReturn(new SCMNodeMetric(100l, 90L, 10L));
+    when(mockNodeManager.getNodeStat(datanodes.get(3)))
+        .thenReturn(new SCMNodeMetric(100l, 80L, 20L));
+    when(mockNodeManager.getNodeStat(datanodes.get(4)))
+        .thenReturn(new SCMNodeMetric(100l, 70L, 30L));
+
+    SCMContainerPlacementCapacity scmContainerPlacementRandom =
+        new SCMContainerPlacementCapacity(mockNodeManager, conf);
+
+    List<DatanodeDetails> existingNodes = new ArrayList<>();
+    existingNodes.add(datanodes.get(0));
+    existingNodes.add(datanodes.get(1));
+
+    Map<DatanodeDetails, Integer> selectedCount = new HashMap<>();
+    for (DatanodeDetails datanode : datanodes) {
+      selectedCount.put(datanode, 0);
+    }
+
+    for (int i = 0; i < 1000; i++) {
+
+      //when
+      List<DatanodeDetails> datanodeDetails =
+          scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15);
+
+      //then
+      Assert.assertEquals(1, datanodeDetails.size());
+      DatanodeDetails datanode0Details = datanodeDetails.get(0);
+
+      Assert.assertNotEquals(
+          "Datanode 0 should not been selected: excluded by parameter",
+          datanodes.get(0), datanode0Details);
+      Assert.assertNotEquals(
+          "Datanode 1 should not been selected: excluded by parameter",
+          datanodes.get(1), datanode0Details);
+      Assert.assertNotEquals(
+          "Datanode 2 should not been selected: not enough space there",
+          datanodes.get(2), datanode0Details);
+
+      selectedCount
+          .put(datanode0Details, selectedCount.get(datanode0Details) + 1);
+
+    }
+
+    //datanode 4 has less space. Should be selected less times.
+    Assert.assertTrue(selectedCount.get(datanodes.get(3)) > selectedCount
+        .get(datanodes.get(6)));
+    Assert.assertTrue(selectedCount.get(datanodes.get(4)) > selectedCount
+        .get(datanodes.get(6)));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
new file mode 100644
index 0000000..430c181
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import org.junit.Assert;
+import org.junit.Test;
+import static org.mockito.Matchers.anyObject;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+public class TestSCMContainerPlacementRandom {
+
+  @Test
+  public void chooseDatanodes() throws SCMException {
+    //given
+    Configuration conf = new OzoneConfiguration();
+
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      datanodes.add(TestUtils.getDatanodeDetails());
+    }
+
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    when(mockNodeManager.getNodes(NodeState.HEALTHY))
+        .thenReturn(new ArrayList<>(datanodes));
+
+    when(mockNodeManager.getNodeStat(anyObject()))
+        .thenReturn(new SCMNodeMetric(100l, 0l, 100l));
+    when(mockNodeManager.getNodeStat(datanodes.get(2)))
+        .thenReturn(new SCMNodeMetric(100l, 90l, 10l));
+
+    SCMContainerPlacementRandom scmContainerPlacementRandom =
+        new SCMContainerPlacementRandom(mockNodeManager, conf);
+
+    List<DatanodeDetails> existingNodes = new ArrayList<>();
+    existingNodes.add(datanodes.get(0));
+    existingNodes.add(datanodes.get(1));
+
+    for (int i = 0; i < 100; i++) {
+      //when
+      List<DatanodeDetails> datanodeDetails =
+          scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15);
+
+      //then
+      Assert.assertEquals(1, datanodeDetails.size());
+      DatanodeDetails datanode0Details = datanodeDetails.get(0);
+
+      Assert.assertNotEquals(
+          "Datanode 0 should not been selected: excluded by parameter",
+          datanodes.get(0), datanode0Details);
+      Assert.assertNotEquals(
+          "Datanode 1 should not been selected: excluded by parameter",
+          datanodes.get(1), datanode0Details);
+      Assert.assertNotEquals(
+          "Datanode 2 should not been selected: not enough space there",
+          datanodes.get(2), datanode0Details);
+
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
new file mode 100644
index 0000000..e3e876b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+    .ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+
+import com.google.common.base.Preconditions;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+    .TRACK_REPLICATE_COMMAND;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.anyObject;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test behaviour of the TestReplication.
+ */
+public class TestReplicationManager {
+
+  private EventQueue queue;
+
+  private List<ReplicationRequestToRepeat> trackReplicationEvents;
+
+  private List<CommandForDatanode<ReplicateContainerCommandProto>> copyEvents;
+
+  private ContainerStateManager containerStateManager;
+
+  private ContainerPlacementPolicy containerPlacementPolicy;
+  private List<DatanodeDetails> listOfDatanodeDetails;
+
+  @Before
+  public void initReplicationManager() throws IOException {
+
+    listOfDatanodeDetails = TestUtils.getListOfDatanodeDetails(5);
+
+    containerPlacementPolicy =
+        (excludedNodes, nodesRequired, sizeRequired) -> listOfDatanodeDetails
+            .subList(2, 2 + nodesRequired);
+
+    containerStateManager = Mockito.mock(ContainerStateManager.class);
+
+    //container with 2 replicas
+    ContainerInfo containerInfo = new ContainerInfo.Builder()
+        .setState(LifeCycleState.CLOSED)
+        .build();
+
+    when(containerStateManager.getContainer(anyObject()))
+        .thenReturn(containerInfo);
+
+    queue = new EventQueue();
+
+    trackReplicationEvents = new ArrayList<>();
+    queue.addHandler(TRACK_REPLICATE_COMMAND,
+        (event, publisher) -> trackReplicationEvents.add(event));
+
+    copyEvents = new ArrayList<>();
+    queue.addHandler(SCMEvents.DATANODE_COMMAND,
+        (event, publisher) -> copyEvents.add(event));
+
+  }
+
+  @Test
+  public void testEventSending() throws InterruptedException, IOException {
+
+
+    //GIVEN
+
+    LeaseManager<Long> leaseManager = new LeaseManager<>(100000L);
+    try {
+      leaseManager.start();
+
+      ReplicationManager replicationManager =
+          new ReplicationManager(containerPlacementPolicy,
+              containerStateManager,
+              queue, leaseManager) {
+            @Override
+            protected List<DatanodeDetails> getCurrentReplicas(
+                ReplicationRequest request) throws IOException {
+              return listOfDatanodeDetails.subList(0, 2);
+            }
+          };
+      replicationManager.start();
+
+      //WHEN
+
+      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+          new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(),
+              (short) 3));
+
+      Thread.sleep(500L);
+      queue.processAll(1000L);
+
+      //THEN
+
+      Assert.assertEquals(1, trackReplicationEvents.size());
+      Assert.assertEquals(1, copyEvents.size());
+    } finally {
+      if (leaseManager != null) {
+        leaseManager.shutdown();
+      }
+    }
+  }
+
+  @Test
+  public void testCommandWatcher() throws InterruptedException, IOException {
+
+    Logger.getRootLogger().setLevel(Level.DEBUG);
+    LeaseManager<Long> leaseManager = new LeaseManager<>(1000L);
+
+    try {
+      leaseManager.start();
+
+      ReplicationManager replicationManager =
+          new ReplicationManager(containerPlacementPolicy, containerStateManager,
+
+
+              queue, leaseManager) {
+            @Override
+            protected List<DatanodeDetails> getCurrentReplicas(
+                ReplicationRequest request) throws IOException {
+              return listOfDatanodeDetails.subList(0, 2);
+            }
+          };
+      replicationManager.start();
+
+      queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+          new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(),
+              (short) 3));
+
+      Thread.sleep(500L);
+
+      queue.processAll(1000L);
+
+      Assert.assertEquals(1, trackReplicationEvents.size());
+      Assert.assertEquals(1, copyEvents.size());
+
+      Assert.assertEquals(trackReplicationEvents.get(0).getId(),
+          copyEvents.get(0).getCommand().getId());
+
+      //event is timed out
+      Thread.sleep(1500);
+
+      queue.processAll(1000L);
+
+      //original copy command + retry
+      Assert.assertEquals(2, trackReplicationEvents.size());
+      Assert.assertEquals(2, copyEvents.size());
+
+    } finally {
+      if (leaseManager != null) {
+        leaseManager.shutdown();
+      }
+    }
+  }
+
+  public static Pipeline createPipeline(Iterable<DatanodeDetails> ids)
+      throws IOException {
+    Objects.requireNonNull(ids, "ids == null");
+    final Iterator<DatanodeDetails> i = ids.iterator();
+    Preconditions.checkArgument(i.hasNext());
+    final DatanodeDetails leader = i.next();
+    String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
+    final Pipeline pipeline =
+        new Pipeline(leader.getUuidString(), LifeCycleState.OPEN,
+            ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+    pipeline.addMember(leader);
+    for (; i.hasNext(); ) {
+      pipeline.addMember(i.next());
+    }
+    return pipeline;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
new file mode 100644
index 0000000..a593718
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.util.Random;
+import java.util.UUID;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for ReplicationQueue.
+ */
+public class TestReplicationQueue {
+
+  private ReplicationQueue replicationQueue;
+  private Random random;
+
+  @Before
+  public void setUp() {
+    replicationQueue = new ReplicationQueue();
+    random = new Random();
+  }
+
+  @Test
+  public void testDuplicateAddOp() throws InterruptedException {
+    long contId = random.nextLong();
+    String nodeId = UUID.randomUUID().toString();
+    ReplicationRequest obj1, obj2, obj3;
+    long time = Time.monotonicNow();
+    obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
+    obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
+    obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
+
+    replicationQueue.add(obj1);
+    replicationQueue.add(obj2);
+    replicationQueue.add(obj3);
+    Assert.assertEquals("Should add only 1 msg as second one is duplicate",
+        1, replicationQueue.size());
+    ReplicationRequest temp = replicationQueue.take();
+    Assert.assertEquals(temp, obj3);
+  }
+
+  @Test
+  public void testPollOp() throws InterruptedException {
+    long contId = random.nextLong();
+    String nodeId = UUID.randomUUID().toString();
+    ReplicationRequest msg1, msg2, msg3, msg4, msg5;
+    msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
+        (short) 3);
+    long time = Time.monotonicNow();
+    msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
+    msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
+    msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
+    // Replication message for same container but different nodeId
+    msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
+
+    replicationQueue.add(msg1);
+    replicationQueue.add(msg2);
+    replicationQueue.add(msg3);
+    replicationQueue.add(msg4);
+    replicationQueue.add(msg5);
+    Assert.assertEquals("Should have 3 objects",
+        3, replicationQueue.size());
+
+    // Since Priority queue orders messages according to replication count,
+    // message with lowest replication should be first
+    ReplicationRequest temp;
+    temp = replicationQueue.take();
+    Assert.assertEquals("Should have 2 objects",
+        2, replicationQueue.size());
+    Assert.assertEquals(temp, msg3);
+
+    temp = replicationQueue.take();
+    Assert.assertEquals("Should have 1 objects",
+        1, replicationQueue.size());
+    Assert.assertEquals(temp, msg5);
+
+    // Message 2 should be ordered before message 5 as both have same replication
+    // number but message 2 has earlier timestamp.
+    temp = replicationQueue.take();
+    Assert.assertEquals("Should have 0 objects",
+        replicationQueue.size(), 0);
+    Assert.assertEquals(temp, msg4);
+  }
+
+  @Test
+  public void testRemoveOp() {
+    long contId = random.nextLong();
+    String nodeId = UUID.randomUUID().toString();
+    ReplicationRequest obj1, obj2, obj3;
+    obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
+        (short) 3);
+    obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
+        (short) 3);
+    obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
+        (short) 3);
+
+    replicationQueue.add(obj1);
+    replicationQueue.add(obj2);
+    replicationQueue.add(obj3);
+    Assert.assertEquals("Should have 3 objects",
+        3, replicationQueue.size());
+
+    replicationQueue.remove(obj3);
+    Assert.assertEquals("Should have 2 objects",
+        2, replicationQueue.size());
+
+    replicationQueue.remove(obj2);
+    Assert.assertEquals("Should have 1 objects",
+        1, replicationQueue.size());
+
+    replicationQueue.remove(obj1);
+    Assert.assertEquals("Should have 0 objects",
+        0, replicationQueue.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
new file mode 100644
index 0000000..1423c99
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * SCM Testing and Mocking Utils.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+// Test classes for Replication functionality.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
index 651b776..802f2ef 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
@@ -86,11 +87,11 @@ public class TestContainerPlacement {
     for (int x = 0; x < opsCount; x++) {
       long containerSize = random.nextInt(100) * OzoneConsts.GB;
       List<DatanodeDetails> nodesCapacity =
-          capacityPlacer.chooseDatanodes(nodesRequired, containerSize);
+          capacityPlacer.chooseDatanodes(new ArrayList<>(), nodesRequired, containerSize);
       assertEquals(nodesRequired, nodesCapacity.size());
 
       List<DatanodeDetails> nodesRandom =
-          randomPlacer.chooseDatanodes(nodesRequired, containerSize);
+          randomPlacer.chooseDatanodes(nodesCapacity, nodesRequired, containerSize);
 
       // One fifth of all calls are delete
       if (x % 5 == 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org