You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/07/02 21:38:19 UTC

[ozone] branch master updated: HDDS-6957. EC: ReplicationManager - priortise under replicated containers (#3574)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 03cd7c4c5f HDDS-6957. EC: ReplicationManager - priortise under replicated containers (#3574)
03cd7c4c5f is described below

commit 03cd7c4c5f8f81c3d8375492a753ddf2563dc6ce
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Sat Jul 2 22:38:15 2022 +0100

    HDDS-6957. EC: ReplicationManager - priortise under replicated containers (#3574)
---
 .../replication/ContainerHealthResult.java         | 51 ++++++++++++-
 .../container/replication/ReplicationManager.java  | 77 +++++++++++++++++--
 .../replication/TestReplicationManager.java        | 89 +++++++++++++++++++++-
 3 files changed, 208 insertions(+), 9 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
index 554e7260d1..801d3b2a90 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
@@ -77,10 +77,24 @@ public class ContainerHealthResult {
   public static class UnderReplicatedHealthResult
       extends ContainerHealthResult {
 
+    // For under replicated containers, the best remaining redundancy we can
+    // have is 3 for EC-10-4, 2 for EC-6-3, 1 for EC-3-2 and 2 for Ratis.
+    // A container which is under-replicated due to decommission will have one
+    // more, ie 4, 3, 2, 3 respectively. Ideally we want to sort decommission
+    // only under-replication after all other under-replicated containers.
+    // It may also make sense to allow under-replicated containers a chance to
+    // retry once before processing the decommission only under replication.
+    // Therefore we should adjust the weighted remaining redundancy of
+    // decommission only under-replicated containers to a floor of 5 so they
+    // sort after an under-replicated container with 3 remaining replicas (
+    // EC-10-4) and plus one retry.
+    private static final int DECOMMISSION_REDUNDANCY = 5;
+
     private final int remainingRedundancy;
     private final boolean dueToDecommission;
     private final boolean sufficientlyReplicatedAfterPending;
     private final boolean unrecoverable;
+    private int requeueCount = 0;
 
     UnderReplicatedHealthResult(ContainerInfo containerInfo,
         int remainingRedundancy, boolean dueToDecommission,
@@ -93,7 +107,7 @@ public class ContainerHealthResult {
     }
 
     /**
-     * How many more replicas can be lost before the the container is
+     * How many more replicas can be lost before the container is
      * unreadable. For containers which are under-replicated due to decommission
      * or maintenance only, the remaining redundancy will include those
      * decommissioning or maintenance replicas, as they are technically still
@@ -104,6 +118,41 @@ public class ContainerHealthResult {
       return remainingRedundancy;
     }
 
+    /**
+     * The weightedRedundancy, is the remaining redundancy + the requeue count.
+     * When this value is used for ordering in a priority queue it ensures the
+     * priority is reduced each time it is requeued, to prevent it from blocking
+     * other containers from being processed.
+     * Additionally, so that decommission and maintenance replicas are not
+     * ordered ahead of under-replicated replicas, a redundancy of
+     * DECOMMISSION_REDUNDANCY is used for the decommission redundancy rather
+     * than its real redundancy.
+     * @return The weightedRedundancy of this result.
+     */
+    public int getWeightedRedundancy() {
+      int result = requeueCount;
+      if (dueToDecommission) {
+        result += DECOMMISSION_REDUNDANCY;
+      } else {
+        result += remainingRedundancy;
+      }
+      return result;
+    }
+
+    /**
+     * If there is an attempt to process this under-replicated result, and it
+     * fails and has to be requeued, this method should be called to increment
+     * the requeue count to ensure the result is not placed back at the head
+     * of the queue.
+     */
+    public void incrementRequeueCount() {
+      ++requeueCount;
+    }
+
+    public int getRequeueCount() {
+      return requeueCount;
+    }
+
     /**
      * Indicates whether the under-replication is caused only by replicas
      * being decommissioned or entering maintenance. Ie, there are not replicas
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index a6d00983aa..3e125d0783 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -49,8 +49,11 @@ import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -131,6 +134,9 @@ public class ReplicationManager implements SCMService {
   private final ContainerReplicaPendingOps containerReplicaPendingOps;
   private final ContainerHealthCheck ecContainerHealthCheck;
   private final EventPublisher eventPublisher;
+  private final ReentrantLock lock = new ReentrantLock();
+  private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+      underRepQueue;
 
   /**
    * Constructs ReplicationManager instance with the given configuration.
@@ -167,6 +173,7 @@ public class ReplicationManager implements SCMService {
     this.legacyReplicationManager = legacyReplicationManager;
     this.ecContainerHealthCheck = new ECContainerHealthCheck();
     this.nodeManager = nodeManager;
+    this.underRepQueue = createUnderReplicatedQueue();
     start();
   }
 
@@ -233,8 +240,8 @@ public class ReplicationManager implements SCMService {
     final List<ContainerInfo> containers =
         containerManager.getContainers();
     ReplicationManagerReport report = new ReplicationManagerReport();
-    List<ContainerHealthResult.UnderReplicatedHealthResult> underReplicated =
-        new ArrayList<>();
+    Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+        underReplicated = createUnderReplicatedQueue();
     List<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
         new ArrayList<>();
 
@@ -255,16 +262,61 @@ public class ReplicationManager implements SCMService {
       }
     }
     report.setComplete();
-    // TODO - Sort the pending lists by priority and assign to the main queue,
-    //        which is yet to be defined.
+    lock.lock();
+    try {
+      underRepQueue = underReplicated;
+    } finally {
+      lock.unlock();
+    }
     this.containerReport = report;
     LOG.info("Replication Monitor Thread took {} milliseconds for" +
             " processing {} containers.", clock.millis() - start,
         containers.size());
   }
 
+  /**
+   * Retrieve the new highest priority container to be replicated from the
+   * under replicated queue.
+   * @return The new underReplicated container to be processed, or null if the
+   *         queue is empty.
+   */
+  public ContainerHealthResult.UnderReplicatedHealthResult
+      dequeueUnderReplicatedContainer() {
+    lock.lock();
+    try {
+      return underRepQueue.poll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Add an under replicated container back to the queue if it was unable to
+   * be processed. Its retry count will be incremented before it is re-queued,
+   * reducing its priority.
+   * Note that the queue could have been rebuilt and replaced after this
+   * message was removed but before it is added back. This will result in a
+   * duplicate entry on the queue. However, when it is processed again, the
+   * result of the processing will end up with pending replicas scheduled. If
+   * instance 1 is processed and creates the pending replicas, when instance 2
+   * is processed, it will find the pending containers and know it has no work
+   * to do, and be discarded. Additionally, the queue will be refreshed
+   * periodically removing any duplicates.
+   * @param underReplicatedHealthResult
+   */
+  public void requeueUnderReplicatedContainer(ContainerHealthResult
+      .UnderReplicatedHealthResult underReplicatedHealthResult) {
+    underReplicatedHealthResult.incrementRequeueCount();
+    lock.lock();
+    try {
+      underRepQueue.add(underReplicatedHealthResult);
+    } finally {
+      lock.unlock();
+    }
+  }
+
   protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
-      List<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
+      Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
       List<ContainerHealthResult.OverReplicatedHealthResult> overRep,
       ReplicationManagerReport report) throws ContainerNotFoundException {
     Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
@@ -303,6 +355,21 @@ public class ReplicationManager implements SCMService {
     return health;
   }
 
+  /**
+   * Creates a priority queue of UnderReplicatedHealthResult, where the elements
+   * are ordered by the weighted redundancy of the container. This means that
+   * containers with the least remaining redundancy are at the front of the
+   * queue, and will be processed first.
+   * @return An empty instance of a PriorityQueue.
+   */
+  protected PriorityQueue<ContainerHealthResult.UnderReplicatedHealthResult>
+      createUnderReplicatedQueue() {
+    return new PriorityQueue<>(Comparator.comparing(ContainerHealthResult
+            .UnderReplicatedHealthResult::getWeightedRedundancy)
+        .thenComparing(ContainerHealthResult
+            .UnderReplicatedHealthResult::getRequeueCount));
+  }
+
   public ReplicationManagerReport getContainerReport() {
     return containerReport;
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index fdce87f8ba..74a1955dd7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.ozone.test.TestClock;
@@ -43,11 +45,14 @@ import java.time.Instant;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Set;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerInfo;
 import static org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createReplicas;
 
@@ -68,9 +73,10 @@ public class TestReplicationManager {
   private ContainerReplicaPendingOps containerReplicaPendingOps;
 
   private Map<ContainerID, Set<ContainerReplica>> containerReplicaMap;
+  private Set<ContainerInfo> containerInfoSet;
   private ReplicationConfig repConfig;
   private ReplicationManagerReport repReport;
-  private List<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
+  private Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
   private List<ContainerHealthResult.OverReplicatedHealthResult> overRep;
 
   @Before
@@ -94,6 +100,9 @@ public class TestReplicationManager {
             return containerReplicaMap.get(cid);
           });
 
+    Mockito.when(containerManager.getContainers()).thenAnswer(
+        invocation -> new ArrayList<>(containerInfoSet));
+
     replicationManager = new ReplicationManager(
         configuration,
         containerManager,
@@ -105,10 +114,18 @@ public class TestReplicationManager {
         legacyReplicationManager,
         containerReplicaPendingOps);
     containerReplicaMap = new HashMap<>();
+    containerInfoSet = new HashSet<>();
     repConfig = new ECReplicationConfig(3, 2);
     repReport = new ReplicationManagerReport();
-    underRep = new ArrayList<>();
+    underRep = replicationManager.createUnderReplicatedQueue();
     overRep = new ArrayList<>();
+
+    // Ensure that RM will run when asked.
+    Mockito.when(scmContext.isLeaderReady()).thenReturn(true);
+    Mockito.when(scmContext.isInSafeMode()).thenReturn(false);
+    SCMServiceManager serviceManager = new SCMServiceManager();
+    serviceManager.register(replicationManager);
+    serviceManager.notifyStatusChanged();
   }
 
   @Test
@@ -245,12 +262,78 @@ public class TestReplicationManager {
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
 
+  @Test
+  public void testUnderReplicationQueuePopulated() {
+    ContainerInfo decomContainer = createContainerInfo(repConfig, 1,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(decomContainer, Pair.of(DECOMMISSIONING, 1),
+        Pair.of(DECOMMISSIONING, 2), Pair.of(DECOMMISSIONING, 3),
+        Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5));
+
+    ContainerInfo underRep1 = createContainerInfo(repConfig, 2,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(underRep1, 1, 2, 3, 4);
+    ContainerInfo underRep0 = createContainerInfo(repConfig, 3,
+        HddsProtos.LifeCycleState.CLOSED);
+    addReplicas(underRep0, 1, 2, 3);
+
+    replicationManager.processAll();
+
+    // Get the first message off the queue - it should be underRep0.
+    ContainerHealthResult.UnderReplicatedHealthResult res
+        = replicationManager.dequeueUnderReplicatedContainer();
+    Assert.assertEquals(underRep0, res.getContainerInfo());
+
+    // Now requeue it
+    replicationManager.requeueUnderReplicatedContainer(res);
+
+    // Now get the next message. It should be underRep1, as it has remaining
+    // redundancy 1 + zero retries. UnderRep0 will have remaining redundancy 0
+    // and 1 retry. They will have the same weighted redundancy so lesser
+    // retries should come first
+    res = replicationManager.dequeueUnderReplicatedContainer();
+    Assert.assertEquals(underRep1, res.getContainerInfo());
+
+    // Next message is underRep0. It starts with a weighted redundancy of 0 + 1
+    // retry. The other message on the queue is a decommission only with a
+    // weighted redundancy of 5 + 0. So lets dequeue and requeue the message 4
+    // times. Then the weighted redundancy will be equal and the decommission
+    // one will be next due to having less retries.
+    for (int i = 0; i < 4; i++) {
+      res = replicationManager.dequeueUnderReplicatedContainer();
+      Assert.assertEquals(underRep0, res.getContainerInfo());
+      replicationManager.requeueUnderReplicatedContainer(res);
+    }
+    res = replicationManager.dequeueUnderReplicatedContainer();
+    Assert.assertEquals(decomContainer, res.getContainerInfo());
+
+    res = replicationManager.dequeueUnderReplicatedContainer();
+    Assert.assertEquals(underRep0, res.getContainerInfo());
+
+    res = replicationManager.dequeueUnderReplicatedContainer();
+    Assert.assertNull(res);
+  }
+
+  private Set<ContainerReplica>  addReplicas(ContainerInfo container,
+      Pair<HddsProtos.NodeOperationalState, Integer>... nodes) {
+    final Set<ContainerReplica> replicas =
+        createReplicas(container.containerID(), nodes);
+    storeContainerAndReplicas(container, replicas);
+    return replicas;
+  }
+
   private Set<ContainerReplica> addReplicas(ContainerInfo container,
       int... indexes) {
     final Set<ContainerReplica> replicas =
         createReplicas(container.containerID(), indexes);
-    containerReplicaMap.put(container.containerID(), replicas);
+    storeContainerAndReplicas(container, replicas);
     return replicas;
   }
 
+  private void storeContainerAndReplicas(ContainerInfo container,
+      Set<ContainerReplica> replicas) {
+    containerReplicaMap.put(container.containerID(), replicas);
+    containerInfoSet.add(container);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org