You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/07/02 21:38:19 UTC
[ozone] branch master updated: HDDS-6957. EC: ReplicationManager - priortise under replicated containers (#3574)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 03cd7c4c5f HDDS-6957. EC: ReplicationManager - priortise under replicated containers (#3574)
03cd7c4c5f is described below
commit 03cd7c4c5f8f81c3d8375492a753ddf2563dc6ce
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Sat Jul 2 22:38:15 2022 +0100
HDDS-6957. EC: ReplicationManager - priortise under replicated containers (#3574)
---
.../replication/ContainerHealthResult.java | 51 ++++++++++++-
.../container/replication/ReplicationManager.java | 77 +++++++++++++++++--
.../replication/TestReplicationManager.java | 89 +++++++++++++++++++++-
3 files changed, 208 insertions(+), 9 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
index 554e7260d1..801d3b2a90 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
@@ -77,10 +77,24 @@ public class ContainerHealthResult {
public static class UnderReplicatedHealthResult
extends ContainerHealthResult {
+ // For under replicated containers, the best remaining redundancy we can
+ // have is 3 for EC-10-4, 2 for EC-6-3, 1 for EC-3-2 and 2 for Ratis.
+ // A container which is under-replicated due to decommission will have one
+ // more, ie 4, 3, 2, 3 respectively. Ideally we want to sort decommission
+ // only under-replication after all other under-replicated containers.
+ // It may also make sense to allow under-replicated containers a chance to
+ // retry once before processing the decommission only under replication.
+ // Therefore we should adjust the weighted remaining redundancy of
+ // decommission only under-replicated containers to a floor of 5 so they
+ // sort after an under-replicated container with 3 remaining replicas (
+ // EC-10-4) and plus one retry.
+ private static final int DECOMMISSION_REDUNDANCY = 5;
+
private final int remainingRedundancy;
private final boolean dueToDecommission;
private final boolean sufficientlyReplicatedAfterPending;
private final boolean unrecoverable;
+ private int requeueCount = 0;
UnderReplicatedHealthResult(ContainerInfo containerInfo,
int remainingRedundancy, boolean dueToDecommission,
@@ -93,7 +107,7 @@ public class ContainerHealthResult {
}
/**
- * How many more replicas can be lost before the the container is
+ * How many more replicas can be lost before the container is
* unreadable. For containers which are under-replicated due to decommission
* or maintenance only, the remaining redundancy will include those
* decommissioning or maintenance replicas, as they are technically still
@@ -104,6 +118,41 @@ public class ContainerHealthResult {
return remainingRedundancy;
}
+ /**
+ * The weightedRedundancy, is the remaining redundancy + the requeue count.
+ * When this value is used for ordering in a priority queue it ensures the
+ * priority is reduced each time it is requeued, to prevent it from blocking
+ * other containers from being processed.
+ * Additionally, so that decommission and maintenance replicas are not
+ * ordered ahead of under-replicated replicas, a redundancy of
+ * DECOMMISSION_REDUNDANCY is used for the decommission redundancy rather
+ * than its real redundancy.
+ * @return The weightedRedundancy of this result.
+ */
+ public int getWeightedRedundancy() {
+ int result = requeueCount;
+ if (dueToDecommission) {
+ result += DECOMMISSION_REDUNDANCY;
+ } else {
+ result += remainingRedundancy;
+ }
+ return result;
+ }
+
+ /**
+ * If there is an attempt to process this under-replicated result, and it
+ * fails and has to be requeued, this method should be called to increment
+ * the requeue count to ensure the result is not placed back at the head
+ * of the queue.
+ */
+ public void incrementRequeueCount() {
+ ++requeueCount;
+ }
+
+ public int getRequeueCount() {
+ return requeueCount;
+ }
+
/**
* Indicates whether the under-replication is caused only by replicas
* being decommissioned or entering maintenance. Ie, there are not replicas
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index a6d00983aa..3e125d0783 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -49,8 +49,11 @@ import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -131,6 +134,9 @@ public class ReplicationManager implements SCMService {
private final ContainerReplicaPendingOps containerReplicaPendingOps;
private final ContainerHealthCheck ecContainerHealthCheck;
private final EventPublisher eventPublisher;
+ private final ReentrantLock lock = new ReentrantLock();
+ private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+ underRepQueue;
/**
* Constructs ReplicationManager instance with the given configuration.
@@ -167,6 +173,7 @@ public class ReplicationManager implements SCMService {
this.legacyReplicationManager = legacyReplicationManager;
this.ecContainerHealthCheck = new ECContainerHealthCheck();
this.nodeManager = nodeManager;
+ this.underRepQueue = createUnderReplicatedQueue();
start();
}
@@ -233,8 +240,8 @@ public class ReplicationManager implements SCMService {
final List<ContainerInfo> containers =
containerManager.getContainers();
ReplicationManagerReport report = new ReplicationManagerReport();
- List<ContainerHealthResult.UnderReplicatedHealthResult> underReplicated =
- new ArrayList<>();
+ Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+ underReplicated = createUnderReplicatedQueue();
List<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
new ArrayList<>();
@@ -255,16 +262,61 @@ public class ReplicationManager implements SCMService {
}
}
report.setComplete();
- // TODO - Sort the pending lists by priority and assign to the main queue,
- // which is yet to be defined.
+ lock.lock();
+ try {
+ underRepQueue = underReplicated;
+ } finally {
+ lock.unlock();
+ }
this.containerReport = report;
LOG.info("Replication Monitor Thread took {} milliseconds for" +
" processing {} containers.", clock.millis() - start,
containers.size());
}
+ /**
+ * Retrieve the new highest priority container to be replicated from the
+ * under replicated queue.
+ * @return The new underReplicated container to be processed, or null if the
+ * queue is empty.
+ */
+ public ContainerHealthResult.UnderReplicatedHealthResult
+ dequeueUnderReplicatedContainer() {
+ lock.lock();
+ try {
+ return underRepQueue.poll();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Add an under replicated container back to the queue if it was unable to
+ * be processed. Its retry count will be incremented before it is re-queued,
+ * reducing its priority.
+ * Note that the queue could have been rebuilt and replaced after this
+ * message was removed but before it is added back. This will result in a
+ * duplicate entry on the queue. However, when it is processed again, the
+ * result of the processing will end up with pending replicas scheduled. If
+ * instance 1 is processed and creates the pending replicas, when instance 2
+ * is processed, it will find the pending containers and know it has no work
+ * to do, and be discarded. Additionally, the queue will be refreshed
+ * periodically removing any duplicates.
+ * @param underReplicatedHealthResult
+ */
+ public void requeueUnderReplicatedContainer(ContainerHealthResult
+ .UnderReplicatedHealthResult underReplicatedHealthResult) {
+ underReplicatedHealthResult.incrementRequeueCount();
+ lock.lock();
+ try {
+ underRepQueue.add(underReplicatedHealthResult);
+ } finally {
+ lock.unlock();
+ }
+ }
+
protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
- List<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
+ Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
List<ContainerHealthResult.OverReplicatedHealthResult> overRep,
ReplicationManagerReport report) throws ContainerNotFoundException {
Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
@@ -303,6 +355,21 @@ public class ReplicationManager implements SCMService {
return health;
}
+ /**
+ * Creates a priority queue of UnderReplicatedHealthResult, where the elements
+ * are ordered by the weighted redundancy of the container. This means that
+ * containers with the least remaining redundancy are at the front of the
+ * queue, and will be processed first.
+ * @return An empty instance of a PriorityQueue.
+ */
+ protected PriorityQueue<ContainerHealthResult.UnderReplicatedHealthResult>
+ createUnderReplicatedQueue() {
+ return new PriorityQueue<>(Comparator.comparing(ContainerHealthResult
+ .UnderReplicatedHealthResult::getWeightedRedundancy)
+ .thenComparing(ContainerHealthResult
+ .UnderReplicatedHealthResult::getRequeueCount));
+ }
+
public ReplicationManagerReport getContainerReport() {
return containerReport;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index fdce87f8ba..74a1955dd7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdds.scm.container.replication;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.ozone.test.TestClock;
@@ -43,11 +45,14 @@ import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
@@ -68,9 +73,10 @@ public class TestReplicationManager {
private ContainerReplicaPendingOps containerReplicaPendingOps;
private Map<ContainerID, Set<ContainerReplica>> containerReplicaMap;
+ private Set<ContainerInfo> containerInfoSet;
private ReplicationConfig repConfig;
private ReplicationManagerReport repReport;
- private List<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
+ private Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
private List<ContainerHealthResult.OverReplicatedHealthResult> overRep;
@Before
@@ -94,6 +100,9 @@ public class TestReplicationManager {
return containerReplicaMap.get(cid);
});
+ Mockito.when(containerManager.getContainers()).thenAnswer(
+ invocation -> new ArrayList<>(containerInfoSet));
+
replicationManager = new ReplicationManager(
configuration,
containerManager,
@@ -105,10 +114,18 @@ public class TestReplicationManager {
legacyReplicationManager,
containerReplicaPendingOps);
containerReplicaMap = new HashMap<>();
+ containerInfoSet = new HashSet<>();
repConfig = new ECReplicationConfig(3, 2);
repReport = new ReplicationManagerReport();
- underRep = new ArrayList<>();
+ underRep = replicationManager.createUnderReplicatedQueue();
overRep = new ArrayList<>();
+
+ // Ensure that RM will run when asked.
+ Mockito.when(scmContext.isLeaderReady()).thenReturn(true);
+ Mockito.when(scmContext.isInSafeMode()).thenReturn(false);
+ SCMServiceManager serviceManager = new SCMServiceManager();
+ serviceManager.register(replicationManager);
+ serviceManager.notifyStatusChanged();
}
@Test
@@ -245,12 +262,78 @@ public class TestReplicationManager {
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
+ @Test
+ public void testUnderReplicationQueuePopulated() {
+ ContainerInfo decomContainer = createContainerInfo(repConfig, 1,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(decomContainer, Pair.of(DECOMMISSIONING, 1),
+ Pair.of(DECOMMISSIONING, 2), Pair.of(DECOMMISSIONING, 3),
+ Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5));
+
+ ContainerInfo underRep1 = createContainerInfo(repConfig, 2,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(underRep1, 1, 2, 3, 4);
+ ContainerInfo underRep0 = createContainerInfo(repConfig, 3,
+ HddsProtos.LifeCycleState.CLOSED);
+ addReplicas(underRep0, 1, 2, 3);
+
+ replicationManager.processAll();
+
+ // Get the first message off the queue - it should be underRep0.
+ ContainerHealthResult.UnderReplicatedHealthResult res
+ = replicationManager.dequeueUnderReplicatedContainer();
+ Assert.assertEquals(underRep0, res.getContainerInfo());
+
+ // Now requeue it
+ replicationManager.requeueUnderReplicatedContainer(res);
+
+ // Now get the next message. It should be underRep1, as it has remaining
+ // redundancy 1 + zero retries. UnderRep0 will have remaining redundancy 0
+ // and 1 retry. They will have the same weighted redundancy so lesser
+ // retries should come first
+ res = replicationManager.dequeueUnderReplicatedContainer();
+ Assert.assertEquals(underRep1, res.getContainerInfo());
+
+ // Next message is underRep0. It starts with a weighted redundancy of 0 + 1
+ // retry. The other message on the queue is a decommission only with a
+ // weighted redundancy of 5 + 0. So lets dequeue and requeue the message 4
+ // times. Then the weighted redundancy will be equal and the decommission
+ // one will be next due to having less retries.
+ for (int i = 0; i < 4; i++) {
+ res = replicationManager.dequeueUnderReplicatedContainer();
+ Assert.assertEquals(underRep0, res.getContainerInfo());
+ replicationManager.requeueUnderReplicatedContainer(res);
+ }
+ res = replicationManager.dequeueUnderReplicatedContainer();
+ Assert.assertEquals(decomContainer, res.getContainerInfo());
+
+ res = replicationManager.dequeueUnderReplicatedContainer();
+ Assert.assertEquals(underRep0, res.getContainerInfo());
+
+ res = replicationManager.dequeueUnderReplicatedContainer();
+ Assert.assertNull(res);
+ }
+
+ private Set<ContainerReplica> addReplicas(ContainerInfo container,
+ Pair<HddsProtos.NodeOperationalState, Integer>... nodes) {
+ final Set<ContainerReplica> replicas =
+ createReplicas(container.containerID(), nodes);
+ storeContainerAndReplicas(container, replicas);
+ return replicas;
+ }
+
private Set<ContainerReplica> addReplicas(ContainerInfo container,
int... indexes) {
final Set<ContainerReplica> replicas =
createReplicas(container.containerID(), indexes);
- containerReplicaMap.put(container.containerID(), replicas);
+ storeContainerAndReplicas(container, replicas);
return replicas;
}
+ private void storeContainerAndReplicas(ContainerInfo container,
+ Set<ContainerReplica> replicas) {
+ containerReplicaMap.put(container.containerID(), replicas);
+ containerInfoSet.add(container);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org