You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/02 20:32:29 UTC
[12/45] hadoop git commit: HDDS-186. Create under replicated queue.
Contributed by Ajay Kumar.
HDDS-186. Create under replicated queue. Contributed by Ajay Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e9ec3d78
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e9ec3d78
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e9ec3d78
Branch: refs/heads/HDDS-4
Commit: e9ec3d78f520a8543dc77d763d4b358aa608bae8
Parents: 56a4cdb
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Wed Jun 27 13:35:30 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Wed Jun 27 13:35:30 2018 -0700
----------------------------------------------------------------------
.../container/replication/ReplicationQueue.java | 76 +++++++++++
.../replication/ReplicationRequest.java | 106 +++++++++++++++
.../container/replication/package-info.java | 23 ++++
.../replication/TestReplicationQueue.java | 134 +++++++++++++++++++
.../container/replication/package-info.java | 23 ++++
5 files changed, 362 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
new file mode 100644
index 0000000..e0a2351
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+/**
+ * Priority queue to handle under-replicated and over replicated containers
+ * in ozone. ReplicationManager will consume these messages and decide
+ * accordingly.
+ */
+public class ReplicationQueue {
+
+ private final Queue<ReplicationRequest> queue;
+
+ ReplicationQueue() {
+ queue = new PriorityQueue<>();
+ }
+
+ public synchronized boolean add(ReplicationRequest repObj) {
+ if (this.queue.contains(repObj)) {
+ // Remove the earlier message and insert this one
+ this.queue.remove(repObj);
+ }
+ return this.queue.add(repObj);
+ }
+
+ public synchronized boolean remove(ReplicationRequest repObj) {
+ return queue.remove(repObj);
+ }
+
+ /**
+ * Retrieves, but does not remove, the head of this queue,
+ * or returns {@code null} if this queue is empty.
+ *
+ * @return the head of this queue, or {@code null} if this queue is empty
+ */
+ public synchronized ReplicationRequest peek() {
+ return queue.peek();
+ }
+
+ /**
+ * Retrieves and removes the head of this queue,
+ * or returns {@code null} if this queue is empty.
+ *
+ * @return the head of this queue, or {@code null} if this queue is empty
+ */
+ public synchronized ReplicationRequest poll() {
+ return queue.poll();
+ }
+
+ public synchronized boolean removeAll(List<ReplicationRequest> repObjs) {
+ return queue.removeAll(repObjs);
+ }
+
+ public int size() {
+ return queue.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
new file mode 100644
index 0000000..a6ccce1
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.Serializable;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * Wrapper class for hdds replication queue. Implements its natural
+ * ordering for priority queue.
+ */
+public class ReplicationRequest implements Comparable<ReplicationRequest>,
+ Serializable {
+ private final long containerId;
+ private final short replicationCount;
+ private final short expecReplicationCount;
+ private final long timestamp;
+
+ public ReplicationRequest(long containerId, short replicationCount,
+ long timestamp, short expecReplicationCount) {
+ this.containerId = containerId;
+ this.replicationCount = replicationCount;
+ this.timestamp = timestamp;
+ this.expecReplicationCount = expecReplicationCount;
+ }
+
+ /**
+ * Compares this object with the specified object for order. Returns a
+ * negative integer, zero, or a positive integer as this object is less
+ * than, equal to, or greater than the specified object.
+ * @param o the object to be compared.
+ * @return a negative integer, zero, or a positive integer as this object
+ * is less than, equal to, or greater than the specified object.
+ * @throws NullPointerException if the specified object is null
+ * @throws ClassCastException if the specified object's type prevents it
+ * from being compared to this object.
+ */
+ @Override
+ public int compareTo(ReplicationRequest o) {
+ if (o == null) {
+ return 1;
+ }
+ if (this == o) {
+ return 0;
+ }
+ int retVal = Integer
+ .compare(getReplicationCount() - getExpecReplicationCount(),
+ o.getReplicationCount() - o.getExpecReplicationCount());
+ if (retVal != 0) {
+ return retVal;
+ }
+ return Long.compare(getTimestamp(), o.getTimestamp());
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(91, 1011)
+ .append(getContainerId())
+ .toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ReplicationRequest that = (ReplicationRequest) o;
+ return new EqualsBuilder().append(getContainerId(), that.getContainerId())
+ .isEquals();
+ }
+
+ public long getContainerId() {
+ return containerId;
+ }
+
+ public short getReplicationCount() {
+ return replicationCount;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public short getExpecReplicationCount() {
+ return expecReplicationCount;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000..7f335e3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+/**
+ * Ozone Container replicaton related classes.
+ */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
new file mode 100644
index 0000000..6d74c68
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.util.Random;
+import java.util.UUID;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for ReplicationQueue.
+ */
+public class TestReplicationQueue {
+
+ private ReplicationQueue replicationQueue;
+ private Random random;
+
+ @Before
+ public void setUp() {
+ replicationQueue = new ReplicationQueue();
+ random = new Random();
+ }
+
+ @Test
+ public void testDuplicateAddOp() {
+ long contId = random.nextLong();
+ String nodeId = UUID.randomUUID().toString();
+ ReplicationRequest obj1, obj2, obj3;
+ long time = Time.monotonicNow();
+ obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
+ obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
+ obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
+
+ replicationQueue.add(obj1);
+ replicationQueue.add(obj2);
+ replicationQueue.add(obj3);
+ Assert.assertEquals("Should add only 1 msg as second one is duplicate",
+ 1, replicationQueue.size());
+ ReplicationRequest temp = replicationQueue.poll();
+ Assert.assertEquals(temp, obj3);
+ }
+
+ @Test
+ public void testPollOp() {
+ long contId = random.nextLong();
+ String nodeId = UUID.randomUUID().toString();
+ ReplicationRequest msg1, msg2, msg3, msg4, msg5;
+ msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
+ (short) 3);
+ long time = Time.monotonicNow();
+ msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
+ msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
+ msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
+ // Replication message for same container but different nodeId
+ msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
+
+ replicationQueue.add(msg1);
+ replicationQueue.add(msg2);
+ replicationQueue.add(msg3);
+ replicationQueue.add(msg4);
+ replicationQueue.add(msg5);
+ Assert.assertEquals("Should have 3 objects",
+ 3, replicationQueue.size());
+
+ // Since Priority queue orders messages according to replication count,
+ // message with lowest replication should be first
+ ReplicationRequest temp;
+ temp = replicationQueue.poll();
+ Assert.assertEquals("Should have 2 objects",
+ 2, replicationQueue.size());
+ Assert.assertEquals(temp, msg3);
+
+ temp = replicationQueue.poll();
+ Assert.assertEquals("Should have 1 objects",
+ 1, replicationQueue.size());
+ Assert.assertEquals(temp, msg5);
+
+ // Message 2 should be ordered before message 5 as both have same replication
+ // number but message 2 has earlier timestamp.
+ temp = replicationQueue.poll();
+ Assert.assertEquals("Should have 0 objects",
+ replicationQueue.size(), 0);
+ Assert.assertEquals(temp, msg4);
+ }
+
+ @Test
+ public void testRemoveOp() {
+ long contId = random.nextLong();
+ String nodeId = UUID.randomUUID().toString();
+ ReplicationRequest obj1, obj2, obj3;
+ obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
+ (short) 3);
+ obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
+ (short) 3);
+ obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
+ (short) 3);
+
+ replicationQueue.add(obj1);
+ replicationQueue.add(obj2);
+ replicationQueue.add(obj3);
+ Assert.assertEquals("Should have 3 objects",
+ 3, replicationQueue.size());
+
+ replicationQueue.remove(obj3);
+ Assert.assertEquals("Should have 2 objects",
+ 2, replicationQueue.size());
+
+ replicationQueue.remove(obj2);
+ Assert.assertEquals("Should have 1 objects",
+ 1, replicationQueue.size());
+
+ replicationQueue.remove(obj1);
+ Assert.assertEquals("Should have 0 objects",
+ 0, replicationQueue.size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000..5b1fd0f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * SCM Testing and Mocking Utils.
+ */
+package org.apache.hadoop.ozone.container.replication;
+// Test classes for Replication functionality.
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org