You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/23 17:28:45 UTC
hadoop git commit: HDDS-199. Implement ReplicationManager to handle
underreplication of closed containers. Contributed by Elek Marton.
Repository: hadoop
Updated Branches:
refs/heads/trunk 84d7bf1ee -> 3a9e25edf
HDDS-199. Implement ReplicationManager to handle underreplication of closed containers. Contributed by Elek Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a9e25ed
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a9e25ed
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a9e25ed
Branch: refs/heads/trunk
Commit: 3a9e25edf53187f16ec9f9f6075e850b74b3b91f
Parents: 84d7bf1
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Mon Jul 23 10:13:53 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Mon Jul 23 10:28:33 2018 -0700
----------------------------------------------------------------------
.../apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 +
.../apache/hadoop/ozone/OzoneConfigKeys.java | 1 +
.../common/src/main/resources/ozone-default.xml | 10 +
.../container/replication/ReplicationQueue.java | 76 ------
.../replication/ReplicationRequest.java | 106 --------
.../container/replication/package-info.java | 23 --
.../replication/TestReplicationQueue.java | 134 ----------
.../container/replication/package-info.java | 23 --
.../hadoop/hdds/server/events/EventWatcher.java | 4 +-
.../hadoop/hdds/server/events/TypedEvent.java | 5 +
.../hdds/server/events/TestEventWatcher.java | 6 +-
.../algorithms/ContainerPlacementPolicy.java | 5 +-
.../placement/algorithms/SCMCommonPolicy.java | 8 +-
.../SCMContainerPlacementCapacity.java | 16 +-
.../algorithms/SCMContainerPlacementRandom.java | 7 +-
.../replication/ReplicationCommandWatcher.java | 56 +++++
.../replication/ReplicationManager.java | 242 +++++++++++++++++++
.../container/replication/ReplicationQueue.java | 73 ++++++
.../replication/ReplicationRequest.java | 107 ++++++++
.../scm/container/replication/package-info.java | 23 ++
.../hadoop/hdds/scm/events/SCMEvents.java | 31 +++
.../scm/server/StorageContainerManager.java | 42 +++-
.../TestSCMContainerPlacementCapacity.java | 106 ++++++++
.../TestSCMContainerPlacementRandom.java | 86 +++++++
.../replication/TestReplicationManager.java | 215 ++++++++++++++++
.../replication/TestReplicationQueue.java | 134 ++++++++++
.../scm/container/replication/package-info.java | 23 ++
.../placement/TestContainerPlacement.java | 5 +-
28 files changed, 1192 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 6e940ad..e337d2f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -251,6 +251,13 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_CONTAINER_CLOSE_THRESHOLD =
"ozone.scm.container.close.threshold";
public static final float OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f;
+
+ public static final String HDDS_SCM_WATCHER_TIMEOUT =
+ "hdds.scm.watcher.timeout";
+
+ public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT =
+ "10m";
+
/**
* Never constructed.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0273677..92f0c41 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+
import org.apache.ratis.util.TimeDuration;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 84a3e0c..6ddf3c6 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1108,4 +1108,14 @@
</description>
</property>
+ <property>
+ <name>hdds.scm.watcher.timeout</name>
+ <value>10m</value>
+ <tag>OZONE, SCM, MANAGEMENT</tag>
+ <description>
+ Timeout for the watchers of the HDDS SCM CommandWatchers. After this
+ duration the Copy/Delete container commands will be sent again to the
+ datanode unless the datanode confirms the completion.
+ </description>
+ </property>
</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
deleted file mode 100644
index e0a2351..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Queue;
-
-/**
- * Priority queue to handle under-replicated and over replicated containers
- * in ozone. ReplicationManager will consume these messages and decide
- * accordingly.
- */
-public class ReplicationQueue {
-
- private final Queue<ReplicationRequest> queue;
-
- ReplicationQueue() {
- queue = new PriorityQueue<>();
- }
-
- public synchronized boolean add(ReplicationRequest repObj) {
- if (this.queue.contains(repObj)) {
- // Remove the earlier message and insert this one
- this.queue.remove(repObj);
- }
- return this.queue.add(repObj);
- }
-
- public synchronized boolean remove(ReplicationRequest repObj) {
- return queue.remove(repObj);
- }
-
- /**
- * Retrieves, but does not remove, the head of this queue,
- * or returns {@code null} if this queue is empty.
- *
- * @return the head of this queue, or {@code null} if this queue is empty
- */
- public synchronized ReplicationRequest peek() {
- return queue.peek();
- }
-
- /**
- * Retrieves and removes the head of this queue,
- * or returns {@code null} if this queue is empty.
- *
- * @return the head of this queue, or {@code null} if this queue is empty
- */
- public synchronized ReplicationRequest poll() {
- return queue.poll();
- }
-
- public synchronized boolean removeAll(List<ReplicationRequest> repObjs) {
- return queue.removeAll(repObjs);
- }
-
- public int size() {
- return queue.size();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
deleted file mode 100644
index a6ccce1..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.io.Serializable;
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-
-/**
- * Wrapper class for hdds replication queue. Implements its natural
- * ordering for priority queue.
- */
-public class ReplicationRequest implements Comparable<ReplicationRequest>,
- Serializable {
- private final long containerId;
- private final short replicationCount;
- private final short expecReplicationCount;
- private final long timestamp;
-
- public ReplicationRequest(long containerId, short replicationCount,
- long timestamp, short expecReplicationCount) {
- this.containerId = containerId;
- this.replicationCount = replicationCount;
- this.timestamp = timestamp;
- this.expecReplicationCount = expecReplicationCount;
- }
-
- /**
- * Compares this object with the specified object for order. Returns a
- * negative integer, zero, or a positive integer as this object is less
- * than, equal to, or greater than the specified object.
- * @param o the object to be compared.
- * @return a negative integer, zero, or a positive integer as this object
- * is less than, equal to, or greater than the specified object.
- * @throws NullPointerException if the specified object is null
- * @throws ClassCastException if the specified object's type prevents it
- * from being compared to this object.
- */
- @Override
- public int compareTo(ReplicationRequest o) {
- if (o == null) {
- return 1;
- }
- if (this == o) {
- return 0;
- }
- int retVal = Integer
- .compare(getReplicationCount() - getExpecReplicationCount(),
- o.getReplicationCount() - o.getExpecReplicationCount());
- if (retVal != 0) {
- return retVal;
- }
- return Long.compare(getTimestamp(), o.getTimestamp());
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder(91, 1011)
- .append(getContainerId())
- .toHashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- ReplicationRequest that = (ReplicationRequest) o;
- return new EqualsBuilder().append(getContainerId(), that.getContainerId())
- .isEquals();
- }
-
- public long getContainerId() {
- return containerId;
- }
-
- public short getReplicationCount() {
- return replicationCount;
- }
-
- public long getTimestamp() {
- return timestamp;
- }
-
- public short getExpecReplicationCount() {
- return expecReplicationCount;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
deleted file mode 100644
index 7f335e3..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.replication;
-
-/**
- * Ozone Container replicaton related classes.
- */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
deleted file mode 100644
index 6d74c68..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.replication;
-
-import java.util.Random;
-import java.util.UUID;
-import org.apache.hadoop.util.Time;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test class for ReplicationQueue.
- */
-public class TestReplicationQueue {
-
- private ReplicationQueue replicationQueue;
- private Random random;
-
- @Before
- public void setUp() {
- replicationQueue = new ReplicationQueue();
- random = new Random();
- }
-
- @Test
- public void testDuplicateAddOp() {
- long contId = random.nextLong();
- String nodeId = UUID.randomUUID().toString();
- ReplicationRequest obj1, obj2, obj3;
- long time = Time.monotonicNow();
- obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
- obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
- obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
-
- replicationQueue.add(obj1);
- replicationQueue.add(obj2);
- replicationQueue.add(obj3);
- Assert.assertEquals("Should add only 1 msg as second one is duplicate",
- 1, replicationQueue.size());
- ReplicationRequest temp = replicationQueue.poll();
- Assert.assertEquals(temp, obj3);
- }
-
- @Test
- public void testPollOp() {
- long contId = random.nextLong();
- String nodeId = UUID.randomUUID().toString();
- ReplicationRequest msg1, msg2, msg3, msg4, msg5;
- msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
- (short) 3);
- long time = Time.monotonicNow();
- msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
- msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
- msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
- // Replication message for same container but different nodeId
- msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
-
- replicationQueue.add(msg1);
- replicationQueue.add(msg2);
- replicationQueue.add(msg3);
- replicationQueue.add(msg4);
- replicationQueue.add(msg5);
- Assert.assertEquals("Should have 3 objects",
- 3, replicationQueue.size());
-
- // Since Priority queue orders messages according to replication count,
- // message with lowest replication should be first
- ReplicationRequest temp;
- temp = replicationQueue.poll();
- Assert.assertEquals("Should have 2 objects",
- 2, replicationQueue.size());
- Assert.assertEquals(temp, msg3);
-
- temp = replicationQueue.poll();
- Assert.assertEquals("Should have 1 objects",
- 1, replicationQueue.size());
- Assert.assertEquals(temp, msg5);
-
- // Message 2 should be ordered before message 5 as both have same replication
- // number but message 2 has earlier timestamp.
- temp = replicationQueue.poll();
- Assert.assertEquals("Should have 0 objects",
- replicationQueue.size(), 0);
- Assert.assertEquals(temp, msg4);
- }
-
- @Test
- public void testRemoveOp() {
- long contId = random.nextLong();
- String nodeId = UUID.randomUUID().toString();
- ReplicationRequest obj1, obj2, obj3;
- obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
- (short) 3);
- obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
- (short) 3);
- obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
- (short) 3);
-
- replicationQueue.add(obj1);
- replicationQueue.add(obj2);
- replicationQueue.add(obj3);
- Assert.assertEquals("Should have 3 objects",
- 3, replicationQueue.size());
-
- replicationQueue.remove(obj3);
- Assert.assertEquals("Should have 2 objects",
- 2, replicationQueue.size());
-
- replicationQueue.remove(obj2);
- Assert.assertEquals("Should have 1 objects",
- 1, replicationQueue.size());
-
- replicationQueue.remove(obj1);
- Assert.assertEquals("Should have 0 objects",
- 0, replicationQueue.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
deleted file mode 100644
index 5b1fd0f..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-/**
- * SCM Testing and Mocking Utils.
- */
-package org.apache.hadoop.ozone.container.replication;
-// Test classes for Replication functionality.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
index 473c152..38386d4 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
@@ -180,9 +180,9 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
}
- abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+ protected abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
- abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
+ protected abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload);
public List<TIMEOUT_PAYLOAD> getTimeoutEvents(
Predicate<? super TIMEOUT_PAYLOAD> predicate) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
index c2159ad..62e2419 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java
@@ -48,4 +48,9 @@ public class TypedEvent<T> implements Event<T> {
return name;
}
+ @Override
+ public String toString() {
+ return "TypedEvent{" + "payloadType=" + payloadType + ", name='" + name
+ + '\'' + '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
index 8f18478..786b7b8 100644
--- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
+++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java
@@ -216,12 +216,12 @@ public class TestEventWatcher {
}
@Override
- void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
+ protected void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
publisher.fireEvent(UNDER_REPLICATED, payload);
}
@Override
- void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
+ protected void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
//Good job. We did it.
}
@@ -231,8 +231,6 @@ public class TestEventWatcher {
}
}
- ;
-
private static class ReplicationCompletedEvent
implements IdentifiableEventPayload {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
index 5d91ac5..3336c8e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
@@ -31,11 +31,14 @@ public interface ContainerPlacementPolicy {
/**
* Given the replication factor and size required, return set of datanodes
* that satisfy the nodes and size requirement.
+ *
+ * @param excludedNodes - list of nodes to be excluded.
* @param nodesRequired - number of datanodes required.
* @param sizeRequired - size required for the container or block.
* @return list of datanodes chosen.
* @throws IOException
*/
- List<DatanodeDetails> chooseDatanodes(int nodesRequired, long sizeRequired)
+ List<DatanodeDetails> chooseDatanodes(List<DatanodeDetails> excludedNodes,
+ int nodesRequired, long sizeRequired)
throws IOException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
index 0a595d5..ba241dc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
@@ -95,16 +95,20 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
* 3. if a set of containers are requested, we either meet the required
* number of nodes or we fail that request.
*
+ *
+ * @param excludedNodes - datanodes with existing replicas
* @param nodesRequired - number of datanodes required.
* @param sizeRequired - size required for the container or block.
* @return list of datanodes chosen.
* @throws SCMException SCM exception.
*/
- public List<DatanodeDetails> chooseDatanodes(int nodesRequired, final long
- sizeRequired) throws SCMException {
+ public List<DatanodeDetails> chooseDatanodes(
+ List<DatanodeDetails> excludedNodes,
+ int nodesRequired, final long sizeRequired) throws SCMException {
List<DatanodeDetails> healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ healthyNodes.removeAll(excludedNodes);
String msg;
if (healthyNodes.size() == 0) {
msg = "No healthy node found to allocate container.";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
index 85a6b54..8df8f6e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
@@ -17,17 +17,18 @@
package org.apache.hadoop.hdds.scm.container.placement.algorithms;
-import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.List;
-
/**
* Container placement policy that randomly choose datanodes with remaining
* space to satisfy the size constraints.
@@ -83,6 +84,8 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
/**
* Called by SCM to choose datanodes.
*
+ *
+ * @param excludedNodes - list of the datanodes to exclude.
* @param nodesRequired - number of datanodes required.
* @param sizeRequired - size required for the container or block.
* @return List of datanodes.
@@ -90,9 +93,10 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
*/
@Override
public List<DatanodeDetails> chooseDatanodes(
- final int nodesRequired, final long sizeRequired) throws SCMException {
+ List<DatanodeDetails> excludedNodes, final int nodesRequired,
+ final long sizeRequired) throws SCMException {
List<DatanodeDetails> healthyNodes =
- super.chooseDatanodes(nodesRequired, sizeRequired);
+ super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
if (healthyNodes.size() == nodesRequired) {
return healthyNodes;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
index 9903c84..76702d5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
@@ -56,6 +56,8 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
/**
* Choose datanodes called by the SCM to choose the datanode.
*
+ *
+ * @param excludedNodes - list of the datanodes to exclude.
* @param nodesRequired - number of datanodes required.
* @param sizeRequired - size required for the container or block.
* @return List of Datanodes.
@@ -63,9 +65,10 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy
*/
@Override
public List<DatanodeDetails> chooseDatanodes(
- final int nodesRequired, final long sizeRequired) throws SCMException {
+ List<DatanodeDetails> excludedNodes, final int nodesRequired,
+ final long sizeRequired) throws SCMException {
List<DatanodeDetails> healthyNodes =
- super.chooseDatanodes(nodesRequired, sizeRequired);
+ super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired);
if (healthyNodes.size() == nodesRequired) {
return healthyNodes;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
new file mode 100644
index 0000000..03a81a7
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+ .ReplicationCompleted;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+ .ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+
+/**
+ * Command watcher to track the replication commands.
+ */
+public class ReplicationCommandWatcher
+ extends
+ EventWatcher<ReplicationManager.ReplicationRequestToRepeat,
+ ReplicationManager.ReplicationCompleted> {
+
+ public ReplicationCommandWatcher(Event<ReplicationRequestToRepeat> startEvent,
+ Event<ReplicationCompleted> completionEvent,
+ LeaseManager<Long> leaseManager) {
+ super(startEvent, completionEvent, leaseManager);
+ }
+
+ @Override
+ protected void onTimeout(EventPublisher publisher,
+ ReplicationRequestToRepeat payload) {
+ //put back to the original queue
+ publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+ payload.getRequest());
+ }
+
+ @Override
+ protected void onFinished(EventPublisher publisher,
+ ReplicationRequestToRepeat payload) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
new file mode 100644
index 0000000..5f78722
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+ .TRACK_REPLICATE_COMMAND;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Replication Manager manages the replication of the closed container.
+ */
+public class ReplicationManager implements Runnable {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ReplicationManager.class);
+
+ private ReplicationQueue replicationQueue;
+
+ private ContainerPlacementPolicy containerPlacement;
+
+ private EventPublisher eventPublisher;
+
+ private ReplicationCommandWatcher replicationCommandWatcher;
+
+ private boolean running = true;
+
+ private ContainerStateManager containerStateManager;
+
+ public ReplicationManager(ContainerPlacementPolicy containerPlacement,
+ ContainerStateManager containerStateManager, EventQueue eventQueue,
+ LeaseManager<Long> commandWatcherLeaseManager) {
+
+ this.containerPlacement = containerPlacement;
+ this.containerStateManager = containerStateManager;
+ this.eventPublisher = eventQueue;
+
+ this.replicationCommandWatcher =
+ new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND,
+ SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager);
+
+ this.replicationQueue = new ReplicationQueue();
+
+ eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER,
+ (replicationRequest, publisher) -> replicationQueue
+ .add(replicationRequest));
+
+ this.replicationCommandWatcher.start(eventQueue);
+
+ }
+
+ public void start() {
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Replication Manager").build();
+
+ threadFactory.newThread(this).start();
+ }
+
+ public void run() {
+
+ while (running) {
+ ReplicationRequest request = null;
+ try {
+ //TODO: add throttling here
+ request = replicationQueue.take();
+
+ ContainerID containerID = new ContainerID(request.getContainerId());
+ ContainerInfo containerInfo =
+ containerStateManager.getContainer(containerID);
+
+ Preconditions.checkNotNull(containerInfo,
+ "No information about the container " + request.getContainerId());
+
+ Preconditions
+ .checkState(containerInfo.getState() == LifeCycleState.CLOSED,
+ "Container should be in closed state");
+
+ //check the current replication
+ List<DatanodeDetails> datanodesWithReplicas =
+ getCurrentReplicas(request);
+
+ ReplicationRequest finalRequest = request;
+
+ int inFlightReplications = replicationCommandWatcher.getTimeoutEvents(
+ e -> e.request.getContainerId() == finalRequest.getContainerId())
+ .size();
+
+ int deficit =
+ request.getExpecReplicationCount() - datanodesWithReplicas.size()
+ - inFlightReplications;
+
+ if (deficit > 0) {
+
+ List<DatanodeDetails> selectedDatanodes = containerPlacement
+ .chooseDatanodes(datanodesWithReplicas, deficit,
+ containerInfo.getUsedBytes());
+
+ //send the command
+ for (DatanodeDetails datanode : selectedDatanodes) {
+
+ ReplicateContainerCommand replicateCommand =
+ new ReplicateContainerCommand(containerID.getId(),
+ datanodesWithReplicas);
+
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+ new CommandForDatanode<>(
+ datanode.getUuid(), replicateCommand));
+
+ ReplicationRequestToRepeat timeoutEvent =
+ new ReplicationRequestToRepeat(replicateCommand.getId(),
+ request);
+
+ eventPublisher.fireEvent(TRACK_REPLICATE_COMMAND, timeoutEvent);
+
+ }
+
+ } else if (deficit < 0) {
+ //TODO: too many replicas. Not handled yet.
+ }
+
+ } catch (Exception e) {
+ LOG.error("Can't replicate container {}", request, e);
+ }
+ }
+
+ }
+
+ @VisibleForTesting
+ protected List<DatanodeDetails> getCurrentReplicas(ReplicationRequest request)
+ throws IOException {
+ //TODO: replication information is not yet available after HDDS-175,
+ // should be fixed after HDDS-228
+ return new ArrayList<>();
+ }
+
+ @VisibleForTesting
+ public ReplicationQueue getReplicationQueue() {
+ return replicationQueue;
+ }
+
+ public void stop() {
+ running = false;
+ }
+
+ /**
+ * Event for the ReplicationCommandWatcher to repeate the embedded request
+ * in case fof timeout.
+ */
+ public static class ReplicationRequestToRepeat
+ implements IdentifiableEventPayload {
+
+ private final long commandId;
+
+ private final ReplicationRequest request;
+
+ public ReplicationRequestToRepeat(long commandId,
+ ReplicationRequest request) {
+ this.commandId = commandId;
+ this.request = request;
+ }
+
+ public ReplicationRequest getRequest() {
+ return request;
+ }
+
+ @Override
+ public long getId() {
+ return commandId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReplicationRequestToRepeat that = (ReplicationRequestToRepeat) o;
+ return Objects.equals(request, that.request);
+ }
+
+ @Override
+ public int hashCode() {
+
+ return Objects.hash(request);
+ }
+ }
+
+ public static class ReplicationCompleted implements IdentifiableEventPayload {
+
+ private final long uuid;
+
+ public ReplicationCompleted(long uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public long getId() {
+ return uuid;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
new file mode 100644
index 0000000..4ca67be
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
+
+/**
+ * Priority queue to handle under-replicated and over replicated containers
+ * in ozone. ReplicationManager will consume these messages and decide
+ * accordingly.
+ */
+public class ReplicationQueue {
+
+ private final BlockingQueue<ReplicationRequest> queue;
+
+ public ReplicationQueue() {
+ queue = new PriorityBlockingQueue<>();
+ }
+
+ public boolean add(ReplicationRequest repObj) {
+ if (this.queue.contains(repObj)) {
+ // Remove the earlier message and insert this one
+ this.queue.remove(repObj);
+ }
+ return this.queue.add(repObj);
+ }
+
+ public boolean remove(ReplicationRequest repObj) {
+ return queue.remove(repObj);
+ }
+
+ /**
+ * Retrieves, but does not remove, the head of this queue,
+ * or returns {@code null} if this queue is empty.
+ *
+ * @return the head of this queue, or {@code null} if this queue is empty
+ */
+ public ReplicationRequest peek() {
+ return queue.peek();
+ }
+
+ /**
+ * Retrieves and removes the head of this queue (blocking queue).
+ */
+ public ReplicationRequest take() throws InterruptedException {
+ return queue.take();
+ }
+
+ public boolean removeAll(List<ReplicationRequest> repObjs) {
+ return queue.removeAll(repObjs);
+ }
+
+ public int size() {
+ return queue.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
new file mode 100644
index 0000000..ef7c546
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.io.Serializable;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * Wrapper class for hdds replication queue. Implements its natural
+ * ordering for priority queue.
+ */
+public class ReplicationRequest implements Comparable<ReplicationRequest>,
+ Serializable {
+ private final long containerId;
+ private final short replicationCount;
+ private final short expecReplicationCount;
+ private final long timestamp;
+
+ public ReplicationRequest(long containerId, short replicationCount,
+ long timestamp, short expecReplicationCount) {
+ this.containerId = containerId;
+ this.replicationCount = replicationCount;
+ this.timestamp = timestamp;
+ this.expecReplicationCount = expecReplicationCount;
+ }
+
+ /**
+ * Compares this object with the specified object for order. Returns a
+ * negative integer, zero, or a positive integer as this object is less
+ * than, equal to, or greater than the specified object.
+ * @param o the object to be compared.
+ * @return a negative integer, zero, or a positive integer as this object
+ * is less than, equal to, or greater than the specified object.
+ * @throws NullPointerException if the specified object is null
+ * @throws ClassCastException if the specified object's type prevents it
+ * from being compared to this object.
+ */
+ @Override
+ public int compareTo(ReplicationRequest o) {
+ if (o == null) {
+ return 1;
+ }
+ if (this == o) {
+ return 0;
+ }
+ int retVal = Integer
+ .compare(getReplicationCount() - getExpecReplicationCount(),
+ o.getReplicationCount() - o.getExpecReplicationCount());
+ if (retVal != 0) {
+ return retVal;
+ }
+ return Long.compare(getTimestamp(), o.getTimestamp());
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(91, 1011)
+ .append(getContainerId())
+ .toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReplicationRequest that = (ReplicationRequest) o;
+ return new EqualsBuilder().append(getContainerId(), that.getContainerId())
+ .isEquals();
+ }
+
+ public long getContainerId() {
+ return containerId;
+ }
+
+ public short getReplicationCount() {
+ return replicationCount;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public short getExpecReplicationCount() {
+ return expecReplicationCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
new file mode 100644
index 0000000..934b01e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.container.replication;
+
+/**
+ * HDDS (Closed) Container replicaton related classes.
+ */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 46f1588..ad1702b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -28,6 +28,10 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+ .ReplicationCompleted;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.TypedEvent;
@@ -129,6 +133,33 @@ public final class SCMEvents {
"DeleteBlockCommandStatus");
/**
+ * This is the command for ReplicationManager to handle under/over
+ * replication. Sent by the ContainerReportHandler after processing the
+ * heartbeat.
+ */
+ public static final TypedEvent<ReplicationRequest> REPLICATE_CONTAINER =
+ new TypedEvent<>(ReplicationRequest.class);
+
+ /**
+ * This event is sent by the ReplicaManager to the
+ * ReplicationCommandWatcher to track the in-progress replication.
+ */
+ public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat>
+ TRACK_REPLICATE_COMMAND =
+ new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class);
+ /**
+ * This event comes from the Heartbeat dispatcher (in fact from the
+ * datanode) to notify the scm that the replication is done. This is
+ * received by the replicate command watcher to mark in-progress task as
+ * finished.
+ <p>
+ * TODO: Temporary event, should be replaced by specific Heartbeat
+ * ActionRequred event.
+ */
+ public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE =
+ new TypedEvent<>(ReplicationCompleted.class);
+
+ /**
* Private Ctor. Never Constructed.
*/
private SCMEvents() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index aba6410..f4cd448 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
@@ -38,7 +39,12 @@ import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -61,9 +67,13 @@ import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Storage.StorageState;
import org.apache.hadoop.ozone.common.StorageInfo;
+import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+ .HDDS_SCM_WATCHER_TIMEOUT_DEFAULT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,6 +163,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
* Key = DatanodeUuid, value = ContainerStat.
*/
private Cache<String, ContainerStat> containerReportCache;
+ private final ReplicationManager replicationManager;
+ private final LeaseManager<Long> commandWatcherLeaseManager;
/**
* Creates a new StorageContainerManager. Configuration will be updated
@@ -207,6 +219,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
+ long watcherTimeout =
+ conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
+ HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+
+ commandWatcherLeaseManager = new LeaseManager<>(watcherTimeout);
+
+ //TODO: support configurable containerPlacement policy
+ ContainerPlacementPolicy containerPlacementPolicy =
+ new SCMContainerPlacementCapacity(scmNodeManager, conf);
+
+ replicationManager = new ReplicationManager(containerPlacementPolicy,
+ scmContainerManager.getStateManager(), eventQueue,
+ commandWatcherLeaseManager);
+
scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
.OZONE_ADMINISTRATORS);
scmUsername = UserGroupInformation.getCurrentUser().getUserName();
@@ -552,7 +578,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
httpServer.start();
scmBlockManager.start();
-
+ replicationManager.start();
setStartTime();
}
@@ -562,6 +588,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
public void stop() {
try {
+ LOG.info("Stopping Replication Manager Service.");
+ replicationManager.stop();
+ } catch (Exception ex) {
+ LOG.error("Replication manager service stop failed.", ex);
+ }
+
+ try {
+ LOG.info("Stopping Lease Manager of the command watchers");
+ commandWatcherLeaseManager.shutdown();
+ } catch (Exception ex) {
+ LOG.error("Lease Manager of the command watchers stop failed");
+ }
+
+ try {
LOG.info("Stopping datanode service RPC server");
getDatanodeProtocolServer().stop();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
new file mode 100644
index 0000000..5966f2a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import org.junit.Assert;
+import org.junit.Test;
+import static org.mockito.Matchers.anyObject;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+public class TestSCMContainerPlacementCapacity {
+ @Test
+ public void chooseDatanodes() throws SCMException {
+ //given
+ Configuration conf = new OzoneConfiguration();
+
+ List<DatanodeDetails> datanodes = new ArrayList<>();
+ for (int i = 0; i < 7; i++) {
+ datanodes.add(TestUtils.getDatanodeDetails());
+ }
+
+ NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+ when(mockNodeManager.getNodes(NodeState.HEALTHY))
+ .thenReturn(new ArrayList<>(datanodes));
+
+ when(mockNodeManager.getNodeStat(anyObject()))
+ .thenReturn(new SCMNodeMetric(100l, 0L, 100L));
+ when(mockNodeManager.getNodeStat(datanodes.get(2)))
+ .thenReturn(new SCMNodeMetric(100l, 90L, 10L));
+ when(mockNodeManager.getNodeStat(datanodes.get(3)))
+ .thenReturn(new SCMNodeMetric(100l, 80L, 20L));
+ when(mockNodeManager.getNodeStat(datanodes.get(4)))
+ .thenReturn(new SCMNodeMetric(100l, 70L, 30L));
+
+ SCMContainerPlacementCapacity scmContainerPlacementRandom =
+ new SCMContainerPlacementCapacity(mockNodeManager, conf);
+
+ List<DatanodeDetails> existingNodes = new ArrayList<>();
+ existingNodes.add(datanodes.get(0));
+ existingNodes.add(datanodes.get(1));
+
+ Map<DatanodeDetails, Integer> selectedCount = new HashMap<>();
+ for (DatanodeDetails datanode : datanodes) {
+ selectedCount.put(datanode, 0);
+ }
+
+ for (int i = 0; i < 1000; i++) {
+
+ //when
+ List<DatanodeDetails> datanodeDetails =
+ scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15);
+
+ //then
+ Assert.assertEquals(1, datanodeDetails.size());
+ DatanodeDetails datanode0Details = datanodeDetails.get(0);
+
+ Assert.assertNotEquals(
+ "Datanode 0 should not been selected: excluded by parameter",
+ datanodes.get(0), datanode0Details);
+ Assert.assertNotEquals(
+ "Datanode 1 should not been selected: excluded by parameter",
+ datanodes.get(1), datanode0Details);
+ Assert.assertNotEquals(
+ "Datanode 2 should not been selected: not enough space there",
+ datanodes.get(2), datanode0Details);
+
+ selectedCount
+ .put(datanode0Details, selectedCount.get(datanode0Details) + 1);
+
+ }
+
+ //datanode 4 has less space. Should be selected less times.
+ Assert.assertTrue(selectedCount.get(datanodes.get(3)) > selectedCount
+ .get(datanodes.get(6)));
+ Assert.assertTrue(selectedCount.get(datanodes.get(4)) > selectedCount
+ .get(datanodes.get(6)));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
new file mode 100644
index 0000000..430c181
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import org.junit.Assert;
+import org.junit.Test;
+import static org.mockito.Matchers.anyObject;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+public class TestSCMContainerPlacementRandom {
+
+ @Test
+ public void chooseDatanodes() throws SCMException {
+ //given
+ Configuration conf = new OzoneConfiguration();
+
+ List<DatanodeDetails> datanodes = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ datanodes.add(TestUtils.getDatanodeDetails());
+ }
+
+ NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+ when(mockNodeManager.getNodes(NodeState.HEALTHY))
+ .thenReturn(new ArrayList<>(datanodes));
+
+ when(mockNodeManager.getNodeStat(anyObject()))
+ .thenReturn(new SCMNodeMetric(100l, 0l, 100l));
+ when(mockNodeManager.getNodeStat(datanodes.get(2)))
+ .thenReturn(new SCMNodeMetric(100l, 90l, 10l));
+
+ SCMContainerPlacementRandom scmContainerPlacementRandom =
+ new SCMContainerPlacementRandom(mockNodeManager, conf);
+
+ List<DatanodeDetails> existingNodes = new ArrayList<>();
+ existingNodes.add(datanodes.get(0));
+ existingNodes.add(datanodes.get(1));
+
+ for (int i = 0; i < 100; i++) {
+ //when
+ List<DatanodeDetails> datanodeDetails =
+ scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15);
+
+ //then
+ Assert.assertEquals(1, datanodeDetails.size());
+ DatanodeDetails datanode0Details = datanodeDetails.get(0);
+
+ Assert.assertNotEquals(
+ "Datanode 0 should not been selected: excluded by parameter",
+ datanodes.get(0), datanode0Details);
+ Assert.assertNotEquals(
+ "Datanode 1 should not been selected: excluded by parameter",
+ datanodes.get(1), datanode0Details);
+ Assert.assertNotEquals(
+ "Datanode 2 should not been selected: not enough space there",
+ datanodes.get(2), datanode0Details);
+
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
new file mode 100644
index 0000000..e3e876b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+ .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+ .ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+
+import com.google.common.base.Preconditions;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+ .TRACK_REPLICATE_COMMAND;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import static org.mockito.Matchers.anyObject;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test behaviour of the TestReplication.
+ */
+public class TestReplicationManager {
+
+ private EventQueue queue;
+
+ private List<ReplicationRequestToRepeat> trackReplicationEvents;
+
+ private List<CommandForDatanode<ReplicateContainerCommandProto>> copyEvents;
+
+ private ContainerStateManager containerStateManager;
+
+ private ContainerPlacementPolicy containerPlacementPolicy;
+ private List<DatanodeDetails> listOfDatanodeDetails;
+
+ @Before
+ public void initReplicationManager() throws IOException {
+
+ listOfDatanodeDetails = TestUtils.getListOfDatanodeDetails(5);
+
+ containerPlacementPolicy =
+ (excludedNodes, nodesRequired, sizeRequired) -> listOfDatanodeDetails
+ .subList(2, 2 + nodesRequired);
+
+ containerStateManager = Mockito.mock(ContainerStateManager.class);
+
+ //container with 2 replicas
+ ContainerInfo containerInfo = new ContainerInfo.Builder()
+ .setState(LifeCycleState.CLOSED)
+ .build();
+
+ when(containerStateManager.getContainer(anyObject()))
+ .thenReturn(containerInfo);
+
+ queue = new EventQueue();
+
+ trackReplicationEvents = new ArrayList<>();
+ queue.addHandler(TRACK_REPLICATE_COMMAND,
+ (event, publisher) -> trackReplicationEvents.add(event));
+
+ copyEvents = new ArrayList<>();
+ queue.addHandler(SCMEvents.DATANODE_COMMAND,
+ (event, publisher) -> copyEvents.add(event));
+
+ }
+
+ @Test
+ public void testEventSending() throws InterruptedException, IOException {
+
+
+ //GIVEN
+
+ LeaseManager<Long> leaseManager = new LeaseManager<>(100000L);
+ try {
+ leaseManager.start();
+
+ ReplicationManager replicationManager =
+ new ReplicationManager(containerPlacementPolicy,
+ containerStateManager,
+ queue, leaseManager) {
+ @Override
+ protected List<DatanodeDetails> getCurrentReplicas(
+ ReplicationRequest request) throws IOException {
+ return listOfDatanodeDetails.subList(0, 2);
+ }
+ };
+ replicationManager.start();
+
+ //WHEN
+
+ queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+ new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(),
+ (short) 3));
+
+ Thread.sleep(500L);
+ queue.processAll(1000L);
+
+ //THEN
+
+ Assert.assertEquals(1, trackReplicationEvents.size());
+ Assert.assertEquals(1, copyEvents.size());
+ } finally {
+ if (leaseManager != null) {
+ leaseManager.shutdown();
+ }
+ }
+ }
+
+ @Test
+ public void testCommandWatcher() throws InterruptedException, IOException {
+
+ Logger.getRootLogger().setLevel(Level.DEBUG);
+ LeaseManager<Long> leaseManager = new LeaseManager<>(1000L);
+
+ try {
+ leaseManager.start();
+
+ ReplicationManager replicationManager =
+ new ReplicationManager(containerPlacementPolicy, containerStateManager,
+
+
+ queue, leaseManager) {
+ @Override
+ protected List<DatanodeDetails> getCurrentReplicas(
+ ReplicationRequest request) throws IOException {
+ return listOfDatanodeDetails.subList(0, 2);
+ }
+ };
+ replicationManager.start();
+
+ queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+ new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(),
+ (short) 3));
+
+ Thread.sleep(500L);
+
+ queue.processAll(1000L);
+
+ Assert.assertEquals(1, trackReplicationEvents.size());
+ Assert.assertEquals(1, copyEvents.size());
+
+ Assert.assertEquals(trackReplicationEvents.get(0).getId(),
+ copyEvents.get(0).getCommand().getId());
+
+ //event is timed out
+ Thread.sleep(1500);
+
+ queue.processAll(1000L);
+
+ //original copy command + retry
+ Assert.assertEquals(2, trackReplicationEvents.size());
+ Assert.assertEquals(2, copyEvents.size());
+
+ } finally {
+ if (leaseManager != null) {
+ leaseManager.shutdown();
+ }
+ }
+ }
+
+ public static Pipeline createPipeline(Iterable<DatanodeDetails> ids)
+ throws IOException {
+ Objects.requireNonNull(ids, "ids == null");
+ final Iterator<DatanodeDetails> i = ids.iterator();
+ Preconditions.checkArgument(i.hasNext());
+ final DatanodeDetails leader = i.next();
+ String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3);
+ final Pipeline pipeline =
+ new Pipeline(leader.getUuidString(), LifeCycleState.OPEN,
+ ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+ pipeline.addMember(leader);
+ for (; i.hasNext(); ) {
+ pipeline.addMember(i.next());
+ }
+ return pipeline;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
new file mode 100644
index 0000000..a593718
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.util.Random;
+import java.util.UUID;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for ReplicationQueue.
+ */
+public class TestReplicationQueue {
+
+ private ReplicationQueue replicationQueue;
+ private Random random;
+
+ @Before
+ public void setUp() {
+ replicationQueue = new ReplicationQueue();
+ random = new Random();
+ }
+
+ @Test
+ public void testDuplicateAddOp() throws InterruptedException {
+ long contId = random.nextLong();
+ String nodeId = UUID.randomUUID().toString();
+ ReplicationRequest obj1, obj2, obj3;
+ long time = Time.monotonicNow();
+ obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
+ obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
+ obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
+
+ replicationQueue.add(obj1);
+ replicationQueue.add(obj2);
+ replicationQueue.add(obj3);
+ Assert.assertEquals("Should add only 1 msg as second one is duplicate",
+ 1, replicationQueue.size());
+ ReplicationRequest temp = replicationQueue.take();
+ Assert.assertEquals(temp, obj3);
+ }
+
+ @Test
+ public void testPollOp() throws InterruptedException {
+ long contId = random.nextLong();
+ String nodeId = UUID.randomUUID().toString();
+ ReplicationRequest msg1, msg2, msg3, msg4, msg5;
+ msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
+ (short) 3);
+ long time = Time.monotonicNow();
+ msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
+ msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
+ msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
+ // Replication message for same container but different nodeId
+ msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
+
+ replicationQueue.add(msg1);
+ replicationQueue.add(msg2);
+ replicationQueue.add(msg3);
+ replicationQueue.add(msg4);
+ replicationQueue.add(msg5);
+ Assert.assertEquals("Should have 3 objects",
+ 3, replicationQueue.size());
+
+ // Since Priority queue orders messages according to replication count,
+ // message with lowest replication should be first
+ ReplicationRequest temp;
+ temp = replicationQueue.take();
+ Assert.assertEquals("Should have 2 objects",
+ 2, replicationQueue.size());
+ Assert.assertEquals(temp, msg3);
+
+ temp = replicationQueue.take();
+ Assert.assertEquals("Should have 1 objects",
+ 1, replicationQueue.size());
+ Assert.assertEquals(temp, msg5);
+
+ // Message 2 should be ordered before message 5 as both have same replication
+ // number but message 2 has earlier timestamp.
+ temp = replicationQueue.take();
+ Assert.assertEquals("Should have 0 objects",
+ replicationQueue.size(), 0);
+ Assert.assertEquals(temp, msg4);
+ }
+
+ @Test
+ public void testRemoveOp() {
+ long contId = random.nextLong();
+ String nodeId = UUID.randomUUID().toString();
+ ReplicationRequest obj1, obj2, obj3;
+ obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
+ (short) 3);
+ obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
+ (short) 3);
+ obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
+ (short) 3);
+
+ replicationQueue.add(obj1);
+ replicationQueue.add(obj2);
+ replicationQueue.add(obj3);
+ Assert.assertEquals("Should have 3 objects",
+ 3, replicationQueue.size());
+
+ replicationQueue.remove(obj3);
+ Assert.assertEquals("Should have 2 objects",
+ 2, replicationQueue.size());
+
+ replicationQueue.remove(obj2);
+ Assert.assertEquals("Should have 1 objects",
+ 1, replicationQueue.size());
+
+ replicationQueue.remove(obj1);
+ Assert.assertEquals("Should have 0 objects",
+ 0, replicationQueue.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
new file mode 100644
index 0000000..1423c99
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * SCM Testing and Mocking Utils.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+// Test classes for Replication functionality.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
index 651b776..802f2ef 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.junit.Assert;
import org.junit.Test;
+import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -86,11 +87,11 @@ public class TestContainerPlacement {
for (int x = 0; x < opsCount; x++) {
long containerSize = random.nextInt(100) * OzoneConsts.GB;
List<DatanodeDetails> nodesCapacity =
- capacityPlacer.chooseDatanodes(nodesRequired, containerSize);
+ capacityPlacer.chooseDatanodes(new ArrayList<>(), nodesRequired, containerSize);
assertEquals(nodesRequired, nodesCapacity.size());
List<DatanodeDetails> nodesRandom =
- randomPlacer.chooseDatanodes(nodesRequired, containerSize);
+ randomPlacer.chooseDatanodes(nodesCapacity, nodesRequired, containerSize);
// One fifth of all calls are delete
if (x % 5 == 0) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org