You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/09/20 09:04:40 UTC

[ozone] branch master updated: HDDS-7221. EC: ReplicationManager - Encapsulate the under and over rep queues into a queue object (#3758)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1107fe711c HDDS-7221. EC: ReplicationManager - Encapsulate the under and over rep queues into a queue object (#3758)
1107fe711c is described below

commit 1107fe711ccce396d92af233b1f05cea90f39927
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Sep 20 10:04:33 2022 +0100

    HDDS-7221. EC: ReplicationManager - Encapsulate the under and over rep queues into a queue object (#3758)
---
 .../replication/ContainerCheckRequest.java         | 35 +++--------
 .../container/replication/ReplicationManager.java  | 54 ++++------------
 .../container/replication/ReplicationQueue.java    | 73 ++++++++++++++++++++++
 .../health/ECReplicationCheckHandler.java          |  4 +-
 .../replication/TestReplicationManager.java        | 62 +++++++++---------
 .../health/TestECReplicationCheckHandler.java      | 56 ++++++++---------
 6 files changed, 149 insertions(+), 135 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
index 0433bd5eee..6053a26d27 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
 
 import java.util.List;
-import java.util.Queue;
 import java.util.Set;
 
 /**
@@ -35,10 +34,7 @@ public final class ContainerCheckRequest {
   private final List<ContainerReplicaOp> pendingOps;
   private final int maintenanceRedundancy;
   private final ReplicationManagerReport report;
-  private final Queue<ContainerHealthResult.UnderReplicatedHealthResult>
-      underRepQueue;
-  private final Queue<ContainerHealthResult.OverReplicatedHealthResult>
-      overRepQueue;
+  private final ReplicationQueue replicationQueue;
 
 
   private ContainerCheckRequest(Builder builder) {
@@ -47,8 +43,7 @@ public final class ContainerCheckRequest {
     this.pendingOps = builder.pendingOps;
     this.maintenanceRedundancy = builder.maintenanceRedundancy;
     this.report = builder.report;
-    this.overRepQueue = builder.overRepQueue;
-    this.underRepQueue = builder.underRepQueue;
+    this.replicationQueue = builder.replicationQueue;
   }
 
   public List<ContainerReplicaOp> getPendingOps() {
@@ -71,14 +66,8 @@ public final class ContainerCheckRequest {
     return report;
   }
 
-  public Queue<ContainerHealthResult.UnderReplicatedHealthResult>
-      getUnderRepQueue() {
-    return underRepQueue;
-  }
-
-  public Queue<ContainerHealthResult.OverReplicatedHealthResult>
-      getOverRepQueue() {
-    return overRepQueue;
+  public ReplicationQueue getReplicationQueue() {
+    return replicationQueue;
   }
 
   /**
@@ -91,10 +80,7 @@ public final class ContainerCheckRequest {
     private List<ContainerReplicaOp> pendingOps;
     private int maintenanceRedundancy;
     private ReplicationManagerReport report;
-    private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
-        underRepQueue;
-    private Queue<ContainerHealthResult.OverReplicatedHealthResult>
-        overRepQueue;
+    private ReplicationQueue replicationQueue;
 
     public Builder setContainerInfo(ContainerInfo containerInfo) {
       this.containerInfo = containerInfo;
@@ -117,15 +103,8 @@ public final class ContainerCheckRequest {
       return this;
     }
 
-    public Builder setUnderRepQueue(
-        Queue<ContainerHealthResult.UnderReplicatedHealthResult> queue) {
-      this.underRepQueue = queue;
-      return this;
-    }
-
-    public Builder setOverRepQueue(
-        Queue<ContainerHealthResult.OverReplicatedHealthResult> queue) {
-      this.overRepQueue = queue;
+    public Builder setReplicationQueue(ReplicationQueue repQueue) {
+      this.replicationQueue = repQueue;
       return this;
     }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 699d523e72..bc615c91e8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -58,12 +58,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
-import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -146,10 +142,7 @@ public class ReplicationManager implements SCMService {
   private final ECReplicationCheckHandler ecReplicationCheckHandler;
   private final EventPublisher eventPublisher;
   private final ReentrantLock lock = new ReentrantLock();
-  private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
-      underRepQueue;
-  private Queue<ContainerHealthResult.OverReplicatedHealthResult>
-      overRepQueue;
+  private ReplicationQueue replicationQueue;
   private final ECUnderReplicationHandler ecUnderReplicationHandler;
   private final ECOverReplicationHandler ecOverReplicationHandler;
   private final int maintenanceRedundancy;
@@ -194,8 +187,7 @@ public class ReplicationManager implements SCMService {
     this.legacyReplicationManager = legacyReplicationManager;
     this.ecReplicationCheckHandler = new ECReplicationCheckHandler();
     this.nodeManager = nodeManager;
-    this.underRepQueue = createUnderReplicatedQueue();
-    this.overRepQueue = new LinkedList<>();
+    this.replicationQueue = new ReplicationQueue();
     this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
     ecUnderReplicationHandler = new ECUnderReplicationHandler(
         ecReplicationCheckHandler, containerPlacement, conf, nodeManager);
@@ -299,11 +291,7 @@ public class ReplicationManager implements SCMService {
     final List<ContainerInfo> containers =
         containerManager.getContainers();
     ReplicationManagerReport report = new ReplicationManagerReport();
-    Queue<ContainerHealthResult.UnderReplicatedHealthResult>
-        underReplicated = createUnderReplicatedQueue();
-    Queue<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
-        new LinkedList<>();
-
+    ReplicationQueue newRepQueue = new ReplicationQueue();
     for (ContainerInfo c : containers) {
       if (!shouldRun()) {
         break;
@@ -314,7 +302,7 @@ public class ReplicationManager implements SCMService {
         continue;
       }
       try {
-        processContainer(c, underReplicated, overReplicated, report);
+        processContainer(c, newRepQueue, report);
         // TODO - send any commands contained in the health result
       } catch (ContainerNotFoundException e) {
         LOG.error("Container {} not found", c.getContainerID(), e);
@@ -323,8 +311,7 @@ public class ReplicationManager implements SCMService {
     report.setComplete();
     lock.lock();
     try {
-      underRepQueue = underReplicated;
-      overRepQueue = overReplicated;
+      replicationQueue = newRepQueue;
     } finally {
       lock.unlock();
     }
@@ -344,7 +331,7 @@ public class ReplicationManager implements SCMService {
       dequeueUnderReplicatedContainer() {
     lock.lock();
     try {
-      return underRepQueue.poll();
+      return replicationQueue.dequeueUnderReplicatedContainer();
     } finally {
       lock.unlock();
     }
@@ -360,7 +347,7 @@ public class ReplicationManager implements SCMService {
       dequeueOverReplicatedContainer() {
     lock.lock();
     try {
-      return overRepQueue.poll();
+      return replicationQueue.dequeueOverReplicatedContainer();
     } finally {
       lock.unlock();
     }
@@ -389,7 +376,7 @@ public class ReplicationManager implements SCMService {
     underReplicatedHealthResult.incrementRequeueCount();
     lock.lock();
     try {
-      underRepQueue.add(underReplicatedHealthResult);
+      replicationQueue.enqueue(underReplicatedHealthResult);
     } finally {
       lock.unlock();
     }
@@ -399,7 +386,7 @@ public class ReplicationManager implements SCMService {
       .OverReplicatedHealthResult overReplicatedHealthResult) {
     lock.lock();
     try {
-      overRepQueue.add(overReplicatedHealthResult);
+      replicationQueue.enqueue(overReplicatedHealthResult);
     } finally {
       lock.unlock();
     }
@@ -432,9 +419,8 @@ public class ReplicationManager implements SCMService {
   }
 
   protected void processContainer(ContainerInfo containerInfo,
-      Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
-      Queue<ContainerHealthResult.OverReplicatedHealthResult> overRep,
-      ReplicationManagerReport report) throws ContainerNotFoundException {
+      ReplicationQueue repQueue, ReplicationManagerReport report)
+      throws ContainerNotFoundException {
 
     ContainerID containerID = containerInfo.containerID();
     Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
@@ -448,8 +434,7 @@ public class ReplicationManager implements SCMService {
         .setMaintenanceRedundancy(maintenanceRedundancy)
         .setReport(report)
         .setPendingOps(pendingOps)
-        .setUnderRepQueue(underRep)
-        .setOverRepQueue(overRep)
+        .setReplicationQueue(repQueue)
         .build();
     // This will call the chain of container health handlers in turn which
     // will issue commands as needed, update the report and perhaps add
@@ -500,21 +485,6 @@ public class ReplicationManager implements SCMService {
     return ""; // unit test
   }
 
-  /**
-   * Creates a priority queue of UnderReplicatedHealthResult, where the elements
-   * are ordered by the weighted redundancy of the container. This means that
-   * containers with the least remaining redundancy are at the front of the
-   * queue, and will be processed first.
-   * @return An empty instance of a PriorityQueue.
-   */
-  protected PriorityQueue<ContainerHealthResult.UnderReplicatedHealthResult>
-      createUnderReplicatedQueue() {
-    return new PriorityQueue<>(Comparator.comparing(ContainerHealthResult
-            .UnderReplicatedHealthResult::getWeightedRedundancy)
-        .thenComparing(ContainerHealthResult
-            .UnderReplicatedHealthResult::getRequeueCount));
-  }
-
   public ReplicationManagerReport getContainerReport() {
     return containerReport;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
new file mode 100644
index 0000000000..d27c1d9c61
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+/**
+ * Object to encapsulate the under and over replication queues used by
+ * replicationManager.
+ */
+public class ReplicationQueue {
+
+  private final Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+      underRepQueue;
+  private final Queue<ContainerHealthResult.OverReplicatedHealthResult>
+      overRepQueue;
+
+  public ReplicationQueue() {
+    underRepQueue = new PriorityQueue<>(
+        Comparator.comparing(ContainerHealthResult
+            .UnderReplicatedHealthResult::getWeightedRedundancy)
+        .thenComparing(ContainerHealthResult
+            .UnderReplicatedHealthResult::getRequeueCount));
+    overRepQueue = new LinkedList<>();
+  }
+
+  public void enqueue(ContainerHealthResult.UnderReplicatedHealthResult
+      underReplicatedHealthResult) {
+    underRepQueue.add(underReplicatedHealthResult);
+  }
+
+  public void enqueue(ContainerHealthResult.OverReplicatedHealthResult
+      overReplicatedHealthResult) {
+    overRepQueue.add(overReplicatedHealthResult);
+  }
+
+  public ContainerHealthResult.UnderReplicatedHealthResult
+      dequeueUnderReplicatedContainer() {
+    return underRepQueue.poll();
+  }
+
+  public ContainerHealthResult.OverReplicatedHealthResult
+      dequeueOverReplicatedContainer() {
+    return overRepQueue.poll();
+  }
+
+  public int underReplicatedQueueSize() {
+    return underRepQueue.size();
+  }
+
+  public int overReplicatedQueueSize() {
+    return overRepQueue.size();
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
index 67936c8bec..277409ed8d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
@@ -73,7 +73,7 @@ public class ECReplicationCheckHandler extends AbstractCheck {
       //        handlers can be tried?
       if (!underHealth.isSufficientlyReplicatedAfterPending() &&
           !underHealth.isUnrecoverable()) {
-        request.getUnderRepQueue().add(underHealth);
+        request.getReplicationQueue().enqueue(underHealth);
       }
       return true;
     } else if (health.getHealthState()
@@ -83,7 +83,7 @@ public class ECReplicationCheckHandler extends AbstractCheck {
       ContainerHealthResult.OverReplicatedHealthResult overHealth
           = ((ContainerHealthResult.OverReplicatedHealthResult) health);
       if (!overHealth.isSufficientlyReplicatedAfterPending()) {
-        request.getOverRepQueue().add(overHealth);
+        request.getReplicationQueue().enqueue(overHealth);
       }
       return true;
     }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 613817666c..f6c9ad9287 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -48,9 +48,7 @@ import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
@@ -78,8 +76,7 @@ public class TestReplicationManager {
   private Set<ContainerInfo> containerInfoSet;
   private ReplicationConfig repConfig;
   private ReplicationManagerReport repReport;
-  private Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
-  private Queue<ContainerHealthResult.OverReplicatedHealthResult> overRep;
+  private ReplicationQueue repQueue;
 
   @Before
   public void setup() throws IOException {
@@ -119,8 +116,7 @@ public class TestReplicationManager {
     containerInfoSet = new HashSet<>();
     repConfig = new ECReplicationConfig(3, 2);
     repReport = new ReplicationManagerReport();
-    underRep = replicationManager.createUnderReplicatedQueue();
-    overRep = new LinkedList<>();
+    repQueue = new ReplicationQueue();
 
     // Ensure that RM will run when asked.
     Mockito.when(scmContext.isLeaderReady()).thenReturn(true);
@@ -140,9 +136,9 @@ public class TestReplicationManager {
     // It is under replicated, but as its still open it is seen as healthy.
     addReplicas(container, ContainerReplicaProto.State.OPEN, 1, 2, 3, 4);
     replicationManager.processContainer(
-        container, underRep, overRep, repReport);
-    Assert.assertEquals(0, underRep.size());
-    Assert.assertEquals(0, overRep.size());
+        container, repQueue, repReport);
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
   }
 
   @Test
@@ -153,13 +149,13 @@ public class TestReplicationManager {
     // Container is open but replicas are closed, so it is open but unhealthy.
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
     replicationManager.processContainer(
-        container, underRep, overRep, repReport);
+        container, repQueue, repReport);
     Mockito.verify(eventPublisher, Mockito.times(1))
         .fireEvent(SCMEvents.CLOSE_CONTAINER, container.containerID());
     Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.OPEN_UNHEALTHY));
-    Assert.assertEquals(0, underRep.size());
-    Assert.assertEquals(0, overRep.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
   }
 
   @Test
@@ -169,9 +165,9 @@ public class TestReplicationManager {
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
 
     replicationManager.processContainer(
-        container, underRep, overRep, repReport);
-    Assert.assertEquals(0, underRep.size());
-    Assert.assertEquals(0, overRep.size());
+        container, repQueue, repReport);
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
   }
 
   @Test
@@ -181,9 +177,9 @@ public class TestReplicationManager {
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
 
     replicationManager.processContainer(
-        container, underRep, overRep, repReport);
-    Assert.assertEquals(1, underRep.size());
-    Assert.assertEquals(0, overRep.size());
+        container, repQueue, repReport);
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
   }
@@ -198,11 +194,11 @@ public class TestReplicationManager {
         MockDatanodeDetails.randomDatanodeDetails(), 5);
 
     replicationManager.processContainer(
-        container, underRep, overRep, repReport);
+        container, repQueue, repReport);
     // As the pending replication fixes the under replication, nothing is added
     // to the under replication list.
-    Assert.assertEquals(0, underRep.size());
-    Assert.assertEquals(0, overRep.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     // As the container is still under replicated, as the pending have not
     // completed yet, the container is still marked as under-replicated in the
     // report.
@@ -218,11 +214,11 @@ public class TestReplicationManager {
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2);
 
     replicationManager.processContainer(
-        container, underRep, overRep, repReport);
+        container, repQueue, repReport);
     // If it is unrecoverable, there is no point in putting it into the under
     // replication list. It will be checked again on the next RM run.
-    Assert.assertEquals(0, underRep.size());
-    Assert.assertEquals(0, overRep.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
     Assert.assertEquals(1, repReport.getStat(
@@ -237,12 +233,12 @@ public class TestReplicationManager {
     addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 5, 5);
 
     replicationManager.processContainer(
-        container, underRep, overRep, repReport);
+        container, repQueue, repReport);
     // If it is both under and over replicated, we set it to the most important
     // state, which is under-replicated. When that is fixed, over replication
     // will be handled.
-    Assert.assertEquals(1, underRep.size());
-    Assert.assertEquals(0, overRep.size());
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
     Assert.assertEquals(0, repReport.getStat(
@@ -256,9 +252,9 @@ public class TestReplicationManager {
     addReplicas(container, ContainerReplicaProto.State.CLOSED,
         1, 2, 3, 4, 5, 5);
     replicationManager.processContainer(
-        container, underRep, overRep, repReport);
-    Assert.assertEquals(0, underRep.size());
-    Assert.assertEquals(1, overRep.size());
+        container, repQueue, repReport);
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
@@ -273,11 +269,11 @@ public class TestReplicationManager {
     containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
         MockDatanodeDetails.randomDatanodeDetails(), 5);
     replicationManager.processContainer(
-        container, underRep, overRep, repReport);
-    Assert.assertEquals(0, underRep.size());
+        container, repQueue, repReport);
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
     // If the pending replication fixes the over-replication, nothing is added
     // to the over replication list.
-    Assert.assertEquals(0, overRep.size());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, repReport.getStat(
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
index 6222c2307e..ddd1e41e99 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
@@ -29,15 +29,14 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.Un
 import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
 
 import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 import java.util.Set;
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
@@ -56,8 +55,7 @@ public class TestECReplicationCheckHandler {
 
   private ECReplicationCheckHandler healthCheck;
   private ECReplicationConfig repConfig;
-  private Queue<UnderReplicatedHealthResult> underRepQueue;
-  private Queue<OverReplicatedHealthResult> overRepQueue;
+  private ReplicationQueue repQueue;
   private int maintenanceRedundancy = 2;
   private ContainerCheckRequest.Builder requestBuilder;
   private ReplicationManagerReport report;
@@ -67,12 +65,10 @@ public class TestECReplicationCheckHandler {
   public void setup() {
     healthCheck = new ECReplicationCheckHandler();
     repConfig = new ECReplicationConfig(3, 2);
-    underRepQueue = new LinkedList<>();
-    overRepQueue = new LinkedList<>();
+    repQueue = new ReplicationQueue();
     report = new ReplicationManagerReport();
     requestBuilder = new ContainerCheckRequest.Builder()
-        .setOverRepQueue(overRepQueue)
-        .setUnderRepQueue(underRepQueue)
+        .setReplicationQueue(repQueue)
         .setMaintenanceRedundancy(maintenanceRedundancy)
         .setPendingOps(Collections.emptyList())
         .setReport(report);
@@ -92,8 +88,8 @@ public class TestECReplicationCheckHandler {
     Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
 
     Assert.assertFalse(healthCheck.handle(request));
-    Assert.assertEquals(0, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
   }
 
   @Test
@@ -113,8 +109,8 @@ public class TestECReplicationCheckHandler {
     Assert.assertFalse(result.underReplicatedDueToDecommission());
 
     Assert.assertTrue(healthCheck.handle(request));
-    Assert.assertEquals(1, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, report.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
   }
@@ -141,8 +137,8 @@ public class TestECReplicationCheckHandler {
 
     Assert.assertTrue(healthCheck.handle(request));
     // Fixed with pending so nothing added to the queue
-    Assert.assertEquals(0, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     // Still under replicated until the pending complete
     Assert.assertEquals(1, report.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
@@ -168,8 +164,8 @@ public class TestECReplicationCheckHandler {
     Assert.assertTrue(result.underReplicatedDueToDecommission());
 
     Assert.assertTrue(healthCheck.handle(request));
-    Assert.assertEquals(1, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     // Still under replicated until the pending complete
     Assert.assertEquals(1, report.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
@@ -200,8 +196,8 @@ public class TestECReplicationCheckHandler {
 
     Assert.assertTrue(healthCheck.handle(request));
     // Fixed with pending so nothing added to the queue
-    Assert.assertEquals(0, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     // Still under replicated until the pending complete
     Assert.assertEquals(1, report.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
@@ -230,8 +226,8 @@ public class TestECReplicationCheckHandler {
     Assert.assertFalse(result.underReplicatedDueToDecommission());
 
     Assert.assertTrue(healthCheck.handle(request));
-    Assert.assertEquals(1, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, report.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
   }
@@ -256,8 +252,8 @@ public class TestECReplicationCheckHandler {
 
     Assert.assertTrue(healthCheck.handle(request));
     // Unrecoverable so not added to the queue
-    Assert.assertEquals(0, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, report.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
     Assert.assertEquals(1, report.getStat(
@@ -285,8 +281,8 @@ public class TestECReplicationCheckHandler {
     Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
 
     Assert.assertTrue(healthCheck.handle(request));
-    Assert.assertEquals(0, underRepQueue.size());
-    Assert.assertEquals(1, overRepQueue.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, report.getStat(
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
@@ -318,8 +314,8 @@ public class TestECReplicationCheckHandler {
     Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
 
     Assert.assertTrue(healthCheck.handle(request));
-    Assert.assertEquals(0, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, report.getStat(
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
@@ -342,8 +338,8 @@ public class TestECReplicationCheckHandler {
     // As it is maintenance replicas causing the over replication, the container
     // is not really over-replicated.
     Assert.assertFalse(healthCheck.handle(request));
-    Assert.assertEquals(0, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(0, report.getStat(
         ReplicationManagerReport.HealthState.OVER_REPLICATED));
   }
@@ -367,8 +363,8 @@ public class TestECReplicationCheckHandler {
     // Under-replicated takes precedence and the over-replication is ignored
     // for now.
     Assert.assertTrue(healthCheck.handle(request));
-    Assert.assertEquals(1, underRepQueue.size());
-    Assert.assertEquals(0, overRepQueue.size());
+    Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+    Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
     Assert.assertEquals(1, report.getStat(
         ReplicationManagerReport.HealthState.UNDER_REPLICATED));
     Assert.assertEquals(0, report.getStat(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org