You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/09/20 09:04:40 UTC
[ozone] branch master updated: HDDS-7221. EC: ReplicationManager - Encapsulate the under and over rep queues into a queue object (#3758)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1107fe711c HDDS-7221. EC: ReplicationManager - Encapsulate the under and over rep queues into a queue object (#3758)
1107fe711c is described below
commit 1107fe711ccce396d92af233b1f05cea90f39927
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Sep 20 10:04:33 2022 +0100
HDDS-7221. EC: ReplicationManager - Encapsulate the under and over rep queues into a queue object (#3758)
---
.../replication/ContainerCheckRequest.java | 35 +++--------
.../container/replication/ReplicationManager.java | 54 ++++------------
.../container/replication/ReplicationQueue.java | 73 ++++++++++++++++++++++
.../health/ECReplicationCheckHandler.java | 4 +-
.../replication/TestReplicationManager.java | 62 +++++++++---------
.../health/TestECReplicationCheckHandler.java | 56 ++++++++---------
6 files changed, 149 insertions(+), 135 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
index 0433bd5eee..6053a26d27 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerCheckRequest.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
import java.util.List;
-import java.util.Queue;
import java.util.Set;
/**
@@ -35,10 +34,7 @@ public final class ContainerCheckRequest {
private final List<ContainerReplicaOp> pendingOps;
private final int maintenanceRedundancy;
private final ReplicationManagerReport report;
- private final Queue<ContainerHealthResult.UnderReplicatedHealthResult>
- underRepQueue;
- private final Queue<ContainerHealthResult.OverReplicatedHealthResult>
- overRepQueue;
+ private final ReplicationQueue replicationQueue;
private ContainerCheckRequest(Builder builder) {
@@ -47,8 +43,7 @@ public final class ContainerCheckRequest {
this.pendingOps = builder.pendingOps;
this.maintenanceRedundancy = builder.maintenanceRedundancy;
this.report = builder.report;
- this.overRepQueue = builder.overRepQueue;
- this.underRepQueue = builder.underRepQueue;
+ this.replicationQueue = builder.replicationQueue;
}
public List<ContainerReplicaOp> getPendingOps() {
@@ -71,14 +66,8 @@ public final class ContainerCheckRequest {
return report;
}
- public Queue<ContainerHealthResult.UnderReplicatedHealthResult>
- getUnderRepQueue() {
- return underRepQueue;
- }
-
- public Queue<ContainerHealthResult.OverReplicatedHealthResult>
- getOverRepQueue() {
- return overRepQueue;
+ public ReplicationQueue getReplicationQueue() {
+ return replicationQueue;
}
/**
@@ -91,10 +80,7 @@ public final class ContainerCheckRequest {
private List<ContainerReplicaOp> pendingOps;
private int maintenanceRedundancy;
private ReplicationManagerReport report;
- private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
- underRepQueue;
- private Queue<ContainerHealthResult.OverReplicatedHealthResult>
- overRepQueue;
+ private ReplicationQueue replicationQueue;
public Builder setContainerInfo(ContainerInfo containerInfo) {
this.containerInfo = containerInfo;
@@ -117,15 +103,8 @@ public final class ContainerCheckRequest {
return this;
}
- public Builder setUnderRepQueue(
- Queue<ContainerHealthResult.UnderReplicatedHealthResult> queue) {
- this.underRepQueue = queue;
- return this;
- }
-
- public Builder setOverRepQueue(
- Queue<ContainerHealthResult.OverReplicatedHealthResult> queue) {
- this.overRepQueue = queue;
+ public Builder setReplicationQueue(ReplicationQueue repQueue) {
+ this.replicationQueue = repQueue;
return this;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 699d523e72..bc615c91e8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -58,12 +58,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
-import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -146,10 +142,7 @@ public class ReplicationManager implements SCMService {
private final ECReplicationCheckHandler ecReplicationCheckHandler;
private final EventPublisher eventPublisher;
private final ReentrantLock lock = new ReentrantLock();
- private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
- underRepQueue;
- private Queue<ContainerHealthResult.OverReplicatedHealthResult>
- overRepQueue;
+ private ReplicationQueue replicationQueue;
private final ECUnderReplicationHandler ecUnderReplicationHandler;
private final ECOverReplicationHandler ecOverReplicationHandler;
private final int maintenanceRedundancy;
@@ -194,8 +187,7 @@ public class ReplicationManager implements SCMService {
this.legacyReplicationManager = legacyReplicationManager;
this.ecReplicationCheckHandler = new ECReplicationCheckHandler();
this.nodeManager = nodeManager;
- this.underRepQueue = createUnderReplicatedQueue();
- this.overRepQueue = new LinkedList<>();
+ this.replicationQueue = new ReplicationQueue();
this.maintenanceRedundancy = rmConf.maintenanceRemainingRedundancy;
ecUnderReplicationHandler = new ECUnderReplicationHandler(
ecReplicationCheckHandler, containerPlacement, conf, nodeManager);
@@ -299,11 +291,7 @@ public class ReplicationManager implements SCMService {
final List<ContainerInfo> containers =
containerManager.getContainers();
ReplicationManagerReport report = new ReplicationManagerReport();
- Queue<ContainerHealthResult.UnderReplicatedHealthResult>
- underReplicated = createUnderReplicatedQueue();
- Queue<ContainerHealthResult.OverReplicatedHealthResult> overReplicated =
- new LinkedList<>();
-
+ ReplicationQueue newRepQueue = new ReplicationQueue();
for (ContainerInfo c : containers) {
if (!shouldRun()) {
break;
@@ -314,7 +302,7 @@ public class ReplicationManager implements SCMService {
continue;
}
try {
- processContainer(c, underReplicated, overReplicated, report);
+ processContainer(c, newRepQueue, report);
// TODO - send any commands contained in the health result
} catch (ContainerNotFoundException e) {
LOG.error("Container {} not found", c.getContainerID(), e);
@@ -323,8 +311,7 @@ public class ReplicationManager implements SCMService {
report.setComplete();
lock.lock();
try {
- underRepQueue = underReplicated;
- overRepQueue = overReplicated;
+ replicationQueue = newRepQueue;
} finally {
lock.unlock();
}
@@ -344,7 +331,7 @@ public class ReplicationManager implements SCMService {
dequeueUnderReplicatedContainer() {
lock.lock();
try {
- return underRepQueue.poll();
+ return replicationQueue.dequeueUnderReplicatedContainer();
} finally {
lock.unlock();
}
@@ -360,7 +347,7 @@ public class ReplicationManager implements SCMService {
dequeueOverReplicatedContainer() {
lock.lock();
try {
- return overRepQueue.poll();
+ return replicationQueue.dequeueOverReplicatedContainer();
} finally {
lock.unlock();
}
@@ -389,7 +376,7 @@ public class ReplicationManager implements SCMService {
underReplicatedHealthResult.incrementRequeueCount();
lock.lock();
try {
- underRepQueue.add(underReplicatedHealthResult);
+ replicationQueue.enqueue(underReplicatedHealthResult);
} finally {
lock.unlock();
}
@@ -399,7 +386,7 @@ public class ReplicationManager implements SCMService {
.OverReplicatedHealthResult overReplicatedHealthResult) {
lock.lock();
try {
- overRepQueue.add(overReplicatedHealthResult);
+ replicationQueue.enqueue(overReplicatedHealthResult);
} finally {
lock.unlock();
}
@@ -432,9 +419,8 @@ public class ReplicationManager implements SCMService {
}
protected void processContainer(ContainerInfo containerInfo,
- Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
- Queue<ContainerHealthResult.OverReplicatedHealthResult> overRep,
- ReplicationManagerReport report) throws ContainerNotFoundException {
+ ReplicationQueue repQueue, ReplicationManagerReport report)
+ throws ContainerNotFoundException {
ContainerID containerID = containerInfo.containerID();
Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
@@ -448,8 +434,7 @@ public class ReplicationManager implements SCMService {
.setMaintenanceRedundancy(maintenanceRedundancy)
.setReport(report)
.setPendingOps(pendingOps)
- .setUnderRepQueue(underRep)
- .setOverRepQueue(overRep)
+ .setReplicationQueue(repQueue)
.build();
// This will call the chain of container health handlers in turn which
// will issue commands as needed, update the report and perhaps add
@@ -500,21 +485,6 @@ public class ReplicationManager implements SCMService {
return ""; // unit test
}
- /**
- * Creates a priority queue of UnderReplicatedHealthResult, where the elements
- * are ordered by the weighted redundancy of the container. This means that
- * containers with the least remaining redundancy are at the front of the
- * queue, and will be processed first.
- * @return An empty instance of a PriorityQueue.
- */
- protected PriorityQueue<ContainerHealthResult.UnderReplicatedHealthResult>
- createUnderReplicatedQueue() {
- return new PriorityQueue<>(Comparator.comparing(ContainerHealthResult
- .UnderReplicatedHealthResult::getWeightedRedundancy)
- .thenComparing(ContainerHealthResult
- .UnderReplicatedHealthResult::getRequeueCount));
- }
-
public ReplicationManagerReport getContainerReport() {
return containerReport;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
new file mode 100644
index 0000000000..d27c1d9c61
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+/**
+ * Object to encapsulate the under and over replication queues used by
+ * replicationManager.
+ */
+public class ReplicationQueue {
+
+ private final Queue<ContainerHealthResult.UnderReplicatedHealthResult>
+ underRepQueue;
+ private final Queue<ContainerHealthResult.OverReplicatedHealthResult>
+ overRepQueue;
+
+ public ReplicationQueue() {
+ underRepQueue = new PriorityQueue<>(
+ Comparator.comparing(ContainerHealthResult
+ .UnderReplicatedHealthResult::getWeightedRedundancy)
+ .thenComparing(ContainerHealthResult
+ .UnderReplicatedHealthResult::getRequeueCount));
+ overRepQueue = new LinkedList<>();
+ }
+
+ public void enqueue(ContainerHealthResult.UnderReplicatedHealthResult
+ underReplicatedHealthResult) {
+ underRepQueue.add(underReplicatedHealthResult);
+ }
+
+ public void enqueue(ContainerHealthResult.OverReplicatedHealthResult
+ overReplicatedHealthResult) {
+ overRepQueue.add(overReplicatedHealthResult);
+ }
+
+ public ContainerHealthResult.UnderReplicatedHealthResult
+ dequeueUnderReplicatedContainer() {
+ return underRepQueue.poll();
+ }
+
+ public ContainerHealthResult.OverReplicatedHealthResult
+ dequeueOverReplicatedContainer() {
+ return overRepQueue.poll();
+ }
+
+ public int underReplicatedQueueSize() {
+ return underRepQueue.size();
+ }
+
+ public int overReplicatedQueueSize() {
+ return overRepQueue.size();
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
index 67936c8bec..277409ed8d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/ECReplicationCheckHandler.java
@@ -73,7 +73,7 @@ public class ECReplicationCheckHandler extends AbstractCheck {
// handlers can be tried?
if (!underHealth.isSufficientlyReplicatedAfterPending() &&
!underHealth.isUnrecoverable()) {
- request.getUnderRepQueue().add(underHealth);
+ request.getReplicationQueue().enqueue(underHealth);
}
return true;
} else if (health.getHealthState()
@@ -83,7 +83,7 @@ public class ECReplicationCheckHandler extends AbstractCheck {
ContainerHealthResult.OverReplicatedHealthResult overHealth
= ((ContainerHealthResult.OverReplicatedHealthResult) health);
if (!overHealth.isSufficientlyReplicatedAfterPending()) {
- request.getOverRepQueue().add(overHealth);
+ request.getReplicationQueue().enqueue(overHealth);
}
return true;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 613817666c..f6c9ad9287 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -48,9 +48,7 @@ import java.time.ZoneId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.Map;
-import java.util.Queue;
import java.util.Set;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT;
@@ -78,8 +76,7 @@ public class TestReplicationManager {
private Set<ContainerInfo> containerInfoSet;
private ReplicationConfig repConfig;
private ReplicationManagerReport repReport;
- private Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep;
- private Queue<ContainerHealthResult.OverReplicatedHealthResult> overRep;
+ private ReplicationQueue repQueue;
@Before
public void setup() throws IOException {
@@ -119,8 +116,7 @@ public class TestReplicationManager {
containerInfoSet = new HashSet<>();
repConfig = new ECReplicationConfig(3, 2);
repReport = new ReplicationManagerReport();
- underRep = replicationManager.createUnderReplicatedQueue();
- overRep = new LinkedList<>();
+ repQueue = new ReplicationQueue();
// Ensure that RM will run when asked.
Mockito.when(scmContext.isLeaderReady()).thenReturn(true);
@@ -140,9 +136,9 @@ public class TestReplicationManager {
// It is under replicated, but as its still open it is seen as healthy.
addReplicas(container, ContainerReplicaProto.State.OPEN, 1, 2, 3, 4);
replicationManager.processContainer(
- container, underRep, overRep, repReport);
- Assert.assertEquals(0, underRep.size());
- Assert.assertEquals(0, overRep.size());
+ container, repQueue, repReport);
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
}
@Test
@@ -153,13 +149,13 @@ public class TestReplicationManager {
// Container is open but replicas are closed, so it is open but unhealthy.
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
replicationManager.processContainer(
- container, underRep, overRep, repReport);
+ container, repQueue, repReport);
Mockito.verify(eventPublisher, Mockito.times(1))
.fireEvent(SCMEvents.CLOSE_CONTAINER, container.containerID());
Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.OPEN_UNHEALTHY));
- Assert.assertEquals(0, underRep.size());
- Assert.assertEquals(0, overRep.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
}
@Test
@@ -169,9 +165,9 @@ public class TestReplicationManager {
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4, 5);
replicationManager.processContainer(
- container, underRep, overRep, repReport);
- Assert.assertEquals(0, underRep.size());
- Assert.assertEquals(0, overRep.size());
+ container, repQueue, repReport);
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
}
@Test
@@ -181,9 +177,9 @@ public class TestReplicationManager {
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 4);
replicationManager.processContainer(
- container, underRep, overRep, repReport);
- Assert.assertEquals(1, underRep.size());
- Assert.assertEquals(0, overRep.size());
+ container, repQueue, repReport);
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
}
@@ -198,11 +194,11 @@ public class TestReplicationManager {
MockDatanodeDetails.randomDatanodeDetails(), 5);
replicationManager.processContainer(
- container, underRep, overRep, repReport);
+ container, repQueue, repReport);
// As the pending replication fixes the under replication, nothing is added
// to the under replication list.
- Assert.assertEquals(0, underRep.size());
- Assert.assertEquals(0, overRep.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
// As the container is still under replicated, as the pending have not
// completed yet, the container is still marked as under-replicated in the
// report.
@@ -218,11 +214,11 @@ public class TestReplicationManager {
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2);
replicationManager.processContainer(
- container, underRep, overRep, repReport);
+ container, repQueue, repReport);
// If it is unrecoverable, there is no point in putting it into the under
// replication list. It will be checked again on the next RM run.
- Assert.assertEquals(0, underRep.size());
- Assert.assertEquals(0, overRep.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(1, repReport.getStat(
@@ -237,12 +233,12 @@ public class TestReplicationManager {
addReplicas(container, ContainerReplicaProto.State.CLOSED, 1, 2, 3, 5, 5);
replicationManager.processContainer(
- container, underRep, overRep, repReport);
+ container, repQueue, repReport);
// If it is both under and over replicated, we set it to the most important
// state, which is under-replicated. When that is fixed, over replication
// will be handled.
- Assert.assertEquals(1, underRep.size());
- Assert.assertEquals(0, overRep.size());
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(0, repReport.getStat(
@@ -256,9 +252,9 @@ public class TestReplicationManager {
addReplicas(container, ContainerReplicaProto.State.CLOSED,
1, 2, 3, 4, 5, 5);
replicationManager.processContainer(
- container, underRep, overRep, repReport);
- Assert.assertEquals(0, underRep.size());
- Assert.assertEquals(1, overRep.size());
+ container, repQueue, repReport);
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
@@ -273,11 +269,11 @@ public class TestReplicationManager {
containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
MockDatanodeDetails.randomDatanodeDetails(), 5);
replicationManager.processContainer(
- container, underRep, overRep, repReport);
- Assert.assertEquals(0, underRep.size());
+ container, repQueue, repReport);
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
// If the pending replication fixes the over-replication, nothing is added
// to the over replication list.
- Assert.assertEquals(0, overRep.size());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, repReport.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
index 6222c2307e..ddd1e41e99 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestECReplicationCheckHandler.java
@@ -29,15 +29,14 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.Un
import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.HealthState;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
import java.util.Set;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
@@ -56,8 +55,7 @@ public class TestECReplicationCheckHandler {
private ECReplicationCheckHandler healthCheck;
private ECReplicationConfig repConfig;
- private Queue<UnderReplicatedHealthResult> underRepQueue;
- private Queue<OverReplicatedHealthResult> overRepQueue;
+ private ReplicationQueue repQueue;
private int maintenanceRedundancy = 2;
private ContainerCheckRequest.Builder requestBuilder;
private ReplicationManagerReport report;
@@ -67,12 +65,10 @@ public class TestECReplicationCheckHandler {
public void setup() {
healthCheck = new ECReplicationCheckHandler();
repConfig = new ECReplicationConfig(3, 2);
- underRepQueue = new LinkedList<>();
- overRepQueue = new LinkedList<>();
+ repQueue = new ReplicationQueue();
report = new ReplicationManagerReport();
requestBuilder = new ContainerCheckRequest.Builder()
- .setOverRepQueue(overRepQueue)
- .setUnderRepQueue(underRepQueue)
+ .setReplicationQueue(repQueue)
.setMaintenanceRedundancy(maintenanceRedundancy)
.setPendingOps(Collections.emptyList())
.setReport(report);
@@ -92,8 +88,8 @@ public class TestECReplicationCheckHandler {
Assert.assertEquals(HealthState.HEALTHY, result.getHealthState());
Assert.assertFalse(healthCheck.handle(request));
- Assert.assertEquals(0, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
}
@Test
@@ -113,8 +109,8 @@ public class TestECReplicationCheckHandler {
Assert.assertFalse(result.underReplicatedDueToDecommission());
Assert.assertTrue(healthCheck.handle(request));
- Assert.assertEquals(1, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
}
@@ -141,8 +137,8 @@ public class TestECReplicationCheckHandler {
Assert.assertTrue(healthCheck.handle(request));
// Fixed with pending so nothing added to the queue
- Assert.assertEquals(0, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
// Still under replicated until the pending complete
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
@@ -168,8 +164,8 @@ public class TestECReplicationCheckHandler {
Assert.assertTrue(result.underReplicatedDueToDecommission());
Assert.assertTrue(healthCheck.handle(request));
- Assert.assertEquals(1, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
// Still under replicated until the pending complete
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
@@ -200,8 +196,8 @@ public class TestECReplicationCheckHandler {
Assert.assertTrue(healthCheck.handle(request));
// Fixed with pending so nothing added to the queue
- Assert.assertEquals(0, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
// Still under replicated until the pending complete
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
@@ -230,8 +226,8 @@ public class TestECReplicationCheckHandler {
Assert.assertFalse(result.underReplicatedDueToDecommission());
Assert.assertTrue(healthCheck.handle(request));
- Assert.assertEquals(1, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
}
@@ -256,8 +252,8 @@ public class TestECReplicationCheckHandler {
Assert.assertTrue(healthCheck.handle(request));
// Unrecoverable so not added to the queue
- Assert.assertEquals(0, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(1, report.getStat(
@@ -285,8 +281,8 @@ public class TestECReplicationCheckHandler {
Assert.assertFalse(result.isSufficientlyReplicatedAfterPending());
Assert.assertTrue(healthCheck.handle(request));
- Assert.assertEquals(0, underRepQueue.size());
- Assert.assertEquals(1, overRepQueue.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(1, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
@@ -318,8 +314,8 @@ public class TestECReplicationCheckHandler {
Assert.assertTrue(result.isSufficientlyReplicatedAfterPending());
Assert.assertTrue(healthCheck.handle(request));
- Assert.assertEquals(0, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
@@ -342,8 +338,8 @@ public class TestECReplicationCheckHandler {
// As it is maintenance replicas causing the over replication, the container
// is not really over-replicated.
Assert.assertFalse(healthCheck.handle(request));
- Assert.assertEquals(0, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(0, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.OVER_REPLICATED));
}
@@ -367,8 +363,8 @@ public class TestECReplicationCheckHandler {
// Under-replicated takes precedence and the over-replication is ignored
// for now.
Assert.assertTrue(healthCheck.handle(request));
- Assert.assertEquals(1, underRepQueue.size());
- Assert.assertEquals(0, overRepQueue.size());
+ Assert.assertEquals(1, repQueue.underReplicatedQueueSize());
+ Assert.assertEquals(0, repQueue.overReplicatedQueueSize());
Assert.assertEquals(1, report.getStat(
ReplicationManagerReport.HealthState.UNDER_REPLICATED));
Assert.assertEquals(0, report.getStat(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org