You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/02 20:32:29 UTC

[12/45] hadoop git commit: HDDS-186. Create under replicated queue. Contributed by Ajay Kumar.

HDDS-186. Create under replicated queue. Contributed by Ajay Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e9ec3d78
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e9ec3d78
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e9ec3d78

Branch: refs/heads/HDDS-4
Commit: e9ec3d78f520a8543dc77d763d4b358aa608bae8
Parents: 56a4cdb
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Wed Jun 27 13:35:30 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Wed Jun 27 13:35:30 2018 -0700

----------------------------------------------------------------------
 .../container/replication/ReplicationQueue.java |  76 +++++++++++
 .../replication/ReplicationRequest.java         | 106 +++++++++++++++
 .../container/replication/package-info.java     |  23 ++++
 .../replication/TestReplicationQueue.java       | 134 +++++++++++++++++++
 .../container/replication/package-info.java     |  23 ++++
 5 files changed, 362 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
new file mode 100644
index 0000000..e0a2351
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+/**
+ * Priority queue to handle under-replicated and over replicated containers
+ * in ozone. ReplicationManager will consume these messages and decide
+ * accordingly.
+ */
+public class ReplicationQueue {
+
+  private final Queue<ReplicationRequest> queue;
+
+  ReplicationQueue() {
+    queue = new PriorityQueue<>();
+  }
+
+  public synchronized boolean add(ReplicationRequest repObj) {
+    if (this.queue.contains(repObj)) {
+      // Remove the earlier message and insert this one
+      this.queue.remove(repObj);
+    }
+    return this.queue.add(repObj);
+  }
+
+  public synchronized boolean remove(ReplicationRequest repObj) {
+    return queue.remove(repObj);
+  }
+
+  /**
+   * Retrieves, but does not remove, the head of this queue,
+   * or returns {@code null} if this queue is empty.
+   *
+   * @return the head of this queue, or {@code null} if this queue is empty
+   */
+  public synchronized ReplicationRequest peek() {
+    return queue.peek();
+  }
+
+  /**
+   * Retrieves and removes the head of this queue,
+   * or returns {@code null} if this queue is empty.
+   *
+   * @return the head of this queue, or {@code null} if this queue is empty
+   */
+  public synchronized ReplicationRequest poll() {
+    return queue.poll();
+  }
+
+  public synchronized boolean removeAll(List<ReplicationRequest> repObjs) {
+    return queue.removeAll(repObjs);
+  }
+
+  public int size() {
+    return queue.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
new file mode 100644
index 0000000..a6ccce1
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.io.Serializable;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * Wrapper class for hdds replication queue. Implements its natural
+ * ordering for priority queue.
+ */
+public class ReplicationRequest implements Comparable<ReplicationRequest>,
+    Serializable {
+  private final long containerId;
+  private final short replicationCount;
+  private final short expecReplicationCount;
+  private final long timestamp;
+
+  public ReplicationRequest(long containerId, short replicationCount,
+      long timestamp, short expecReplicationCount) {
+    this.containerId = containerId;
+    this.replicationCount = replicationCount;
+    this.timestamp = timestamp;
+    this.expecReplicationCount = expecReplicationCount;
+  }
+
+  /**
+   * Compares this object with the specified object for order.  Returns a
+   * negative integer, zero, or a positive integer as this object is less
+   * than, equal to, or greater than the specified object.
+   * @param o the object to be compared.
+   * @return a negative integer, zero, or a positive integer as this object
+   * is less than, equal to, or greater than the specified object.
+   * @throws NullPointerException if the specified object is null
+   * @throws ClassCastException   if the specified object's type prevents it
+   *                              from being compared to this object.
+   */
+  @Override
+  public int compareTo(ReplicationRequest o) {
+    if (o == null) {
+      return 1;
+    }
+    if (this == o) {
+      return 0;
+    }
+    int retVal = Integer
+        .compare(getReplicationCount() - getExpecReplicationCount(),
+            o.getReplicationCount() - o.getExpecReplicationCount());
+    if (retVal != 0) {
+      return retVal;
+    }
+    return Long.compare(getTimestamp(), o.getTimestamp());
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(91, 1011)
+        .append(getContainerId())
+        .toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ReplicationRequest that = (ReplicationRequest) o;
+    return new EqualsBuilder().append(getContainerId(), that.getContainerId())
+        .isEquals();
+  }
+
+  public long getContainerId() {
+    return containerId;
+  }
+
+  public short getReplicationCount() {
+    return replicationCount;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public short getExpecReplicationCount() {
+    return expecReplicationCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000..7f335e3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+/**
+ * Ozone Container replicaton related classes.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
new file mode 100644
index 0000000..6d74c68
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import java.util.Random;
+import java.util.UUID;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class for ReplicationQueue.
+ */
+public class TestReplicationQueue {
+
+  private ReplicationQueue replicationQueue;
+  private Random random;
+
+  @Before
+  public void setUp() {
+    replicationQueue = new ReplicationQueue();
+    random = new Random();
+  }
+
+  @Test
+  public void testDuplicateAddOp() {
+    long contId = random.nextLong();
+    String nodeId = UUID.randomUUID().toString();
+    ReplicationRequest obj1, obj2, obj3;
+    long time = Time.monotonicNow();
+    obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
+    obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3);
+    obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3);
+
+    replicationQueue.add(obj1);
+    replicationQueue.add(obj2);
+    replicationQueue.add(obj3);
+    Assert.assertEquals("Should add only 1 msg as second one is duplicate",
+        1, replicationQueue.size());
+    ReplicationRequest temp = replicationQueue.poll();
+    Assert.assertEquals(temp, obj3);
+  }
+
+  @Test
+  public void testPollOp() {
+    long contId = random.nextLong();
+    String nodeId = UUID.randomUUID().toString();
+    ReplicationRequest msg1, msg2, msg3, msg4, msg5;
+    msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
+        (short) 3);
+    long time = Time.monotonicNow();
+    msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3);
+    msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3);
+    msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3);
+    // Replication message for same container but different nodeId
+    msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3);
+
+    replicationQueue.add(msg1);
+    replicationQueue.add(msg2);
+    replicationQueue.add(msg3);
+    replicationQueue.add(msg4);
+    replicationQueue.add(msg5);
+    Assert.assertEquals("Should have 3 objects",
+        3, replicationQueue.size());
+
+    // Since Priority queue orders messages according to replication count,
+    // message with lowest replication should be first
+    ReplicationRequest temp;
+    temp = replicationQueue.poll();
+    Assert.assertEquals("Should have 2 objects",
+        2, replicationQueue.size());
+    Assert.assertEquals(temp, msg3);
+
+    temp = replicationQueue.poll();
+    Assert.assertEquals("Should have 1 objects",
+        1, replicationQueue.size());
+    Assert.assertEquals(temp, msg5);
+
+    // Message 2 should be ordered before message 5 as both have same replication
+    // number but message 2 has earlier timestamp.
+    temp = replicationQueue.poll();
+    Assert.assertEquals("Should have 0 objects",
+        replicationQueue.size(), 0);
+    Assert.assertEquals(temp, msg4);
+  }
+
+  @Test
+  public void testRemoveOp() {
+    long contId = random.nextLong();
+    String nodeId = UUID.randomUUID().toString();
+    ReplicationRequest obj1, obj2, obj3;
+    obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(),
+        (short) 3);
+    obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(),
+        (short) 3);
+    obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(),
+        (short) 3);
+
+    replicationQueue.add(obj1);
+    replicationQueue.add(obj2);
+    replicationQueue.add(obj3);
+    Assert.assertEquals("Should have 3 objects",
+        3, replicationQueue.size());
+
+    replicationQueue.remove(obj3);
+    Assert.assertEquals("Should have 2 objects",
+        2, replicationQueue.size());
+
+    replicationQueue.remove(obj2);
+    Assert.assertEquals("Should have 1 objects",
+        1, replicationQueue.size());
+
+    replicationQueue.remove(obj1);
+    Assert.assertEquals("Should have 0 objects",
+        0, replicationQueue.size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e9ec3d78/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000..5b1fd0f
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * SCM Testing and Mocking Utils.
+ */
+package org.apache.hadoop.ozone.container.replication;
+// Test classes for Replication functionality.
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org