You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2023/02/21 13:16:19 UTC
[ozone] branch master updated: HDDS-7989. UnhealthyReplicationProcessor retries failure without delay (#4285)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 47a68f8751 HDDS-7989. UnhealthyReplicationProcessor retries failure without delay (#4285)
47a68f8751 is described below
commit 47a68f8751404664f19be617a789d1fe84b8472a
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Tue Feb 21 14:16:12 2023 +0100
HDDS-7989. UnhealthyReplicationProcessor retries failure without delay (#4285)
---
.../replication/UnhealthyReplicationProcessor.java | 9 ++++-
.../replication/TestOverReplicatedProcessor.java | 32 +++++++++++++-----
.../replication/TestUnderReplicatedProcessor.java | 38 ++++++++++++++--------
3 files changed, 56 insertions(+), 23 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
index 1289981c51..15df85b6f7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
@@ -27,6 +27,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -80,6 +82,7 @@ public abstract class UnhealthyReplicationProcessor<HealthResult extends
int failed = 0;
Map<ContainerHealthResult.HealthState, Integer> healthStateCntMap =
Maps.newHashMap();
+ List<HealthResult> failedOnes = new LinkedList<>();
while (true) {
if (!replicationManager.shouldRun()) {
break;
@@ -99,9 +102,13 @@ public abstract class UnhealthyReplicationProcessor<HealthResult extends
"container {}", healthResult.getClass(),
healthResult.getContainerInfo(), e);
failed++;
- requeueHealthResultFromQueue(replicationManager, healthResult);
+ failedOnes.add(healthResult);
}
}
+
+ failedOnes.forEach(result ->
+ requeueHealthResultFromQueue(replicationManager, result));
+
LOG.info("Processed {} containers with health state counts {}," +
"failed processing {}", processed, healthStateCntMap, failed);
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
index 910ba75f98..af230897af 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
@@ -25,11 +25,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
@@ -47,6 +49,7 @@ public class TestOverReplicatedProcessor {
private ReplicationManager replicationManager;
private ECReplicationConfig repConfig;
private OverReplicatedProcessor overReplicatedProcessor;
+ private ReplicationQueue queue;
@Before
public void setup() {
@@ -54,6 +57,20 @@ public class TestOverReplicatedProcessor {
ReplicationManagerConfiguration rmConf =
conf.getObject(ReplicationManagerConfiguration.class);
replicationManager = Mockito.mock(ReplicationManager.class);
+
+ // use real queue
+ queue = new ReplicationQueue();
+ Mockito.when(replicationManager.dequeueOverReplicatedContainer())
+ .thenAnswer(inv -> queue.dequeueOverReplicatedContainer());
+ ArgumentCaptor<OverReplicatedHealthResult> captor =
+ ArgumentCaptor.forClass(OverReplicatedHealthResult.class);
+ Mockito.doAnswer(inv -> {
+ queue.enqueue(captor.getValue());
+ return null;
+ })
+ .when(replicationManager)
+ .requeueOverReplicatedContainer(captor.capture());
+
repConfig = new ECReplicationConfig(3, 2);
overReplicatedProcessor = new OverReplicatedProcessor(
replicationManager, rmConf.getOverReplicatedInterval());
@@ -64,10 +81,8 @@ public class TestOverReplicatedProcessor {
public void testDeleteContainerCommand() throws IOException {
ContainerInfo container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
- Mockito.when(replicationManager.dequeueOverReplicatedContainer())
- .thenReturn(
- new ContainerHealthResult.OverReplicatedHealthResult(container, 3,
- false), null);
+ queue.enqueue(new OverReplicatedHealthResult(
+ container, 3, false));
Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
DeleteContainerCommand cmd =
new DeleteContainerCommand(container.getContainerID());
@@ -86,14 +101,13 @@ public class TestOverReplicatedProcessor {
public void testMessageRequeuedOnException() throws IOException {
ContainerInfo container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
- Mockito.when(replicationManager.dequeueOverReplicatedContainer())
- .thenReturn(new ContainerHealthResult
- .OverReplicatedHealthResult(container, 3, false),
- null);
+ queue.enqueue(new OverReplicatedHealthResult(
+ container, 3, false));
Mockito.when(replicationManager
.processOverReplicatedContainer(any()))
- .thenThrow(new IOException("Test Exception"));
+ .thenThrow(new IOException("Test Exception"))
+ .thenThrow(new AssertionError("Should process only one item"));
overReplicatedProcessor.processAll();
Mockito.verify(replicationManager, Mockito.times(0))
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
index 48cc5476ab..91a8c62471 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
@@ -25,12 +25,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import java.io.IOException;
@@ -50,6 +52,7 @@ public class TestUnderReplicatedProcessor {
private ReplicationManager replicationManager;
private ECReplicationConfig repConfig;
private UnderReplicatedProcessor underReplicatedProcessor;
+ private ReplicationQueue queue;
@Before
public void setup() {
@@ -57,6 +60,20 @@ public class TestUnderReplicatedProcessor {
ReplicationManagerConfiguration rmConf =
conf.getObject(ReplicationManagerConfiguration.class);
replicationManager = Mockito.mock(ReplicationManager.class);
+
+ // use real queue
+ queue = new ReplicationQueue();
+ Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
+ .thenAnswer(inv -> queue.dequeueUnderReplicatedContainer());
+ ArgumentCaptor<UnderReplicatedHealthResult> captor =
+ ArgumentCaptor.forClass(UnderReplicatedHealthResult.class);
+ Mockito.doAnswer(inv -> {
+ queue.enqueue(captor.getValue());
+ return null;
+ })
+ .when(replicationManager)
+ .requeueUnderReplicatedContainer(captor.capture());
+
repConfig = new ECReplicationConfig(3, 2);
underReplicatedProcessor = new UnderReplicatedProcessor(
replicationManager, rmConf.getUnderReplicatedInterval());
@@ -69,10 +86,8 @@ public class TestUnderReplicatedProcessor {
public void testEcReconstructionCommand() throws IOException {
ContainerInfo container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
- Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
- .thenReturn(new ContainerHealthResult
- .UnderReplicatedHealthResult(container, 3, false, false, false),
- (ContainerHealthResult.UnderReplicatedHealthResult) null);
+ queue.enqueue(new UnderReplicatedHealthResult(
+ container, 3, false, false, false));
List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
sourceNodes = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
@@ -105,10 +120,8 @@ public class TestUnderReplicatedProcessor {
public void testEcReplicationCommand() throws IOException {
ContainerInfo container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
- Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
- .thenReturn(new ContainerHealthResult
- .UnderReplicatedHealthResult(container, 3, true, false, false),
- (ContainerHealthResult.UnderReplicatedHealthResult) null);
+ queue.enqueue(new UnderReplicatedHealthResult(
+ container, 3, true, false, false));
List<DatanodeDetails> sourceDns = new ArrayList<>();
sourceDns.add(MockDatanodeDetails.randomDatanodeDetails());
DatanodeDetails targetDn = MockDatanodeDetails.randomDatanodeDetails();
@@ -134,14 +147,13 @@ public class TestUnderReplicatedProcessor {
public void testMessageRequeuedOnException() throws IOException {
ContainerInfo container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
- Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
- .thenReturn(new ContainerHealthResult
- .UnderReplicatedHealthResult(container, 3, false, false, false),
- (ContainerHealthResult.UnderReplicatedHealthResult) null);
+ queue.enqueue(new UnderReplicatedHealthResult(
+ container, 3, false, false, false));
Mockito.when(replicationManager
.processUnderReplicatedContainer(any()))
- .thenThrow(new IOException("Test Exception"));
+ .thenThrow(new IOException("Test Exception"))
+ .thenThrow(new AssertionError("Should process only one item"));
underReplicatedProcessor.processAll();
Mockito.verify(replicationManager, Mockito.times(0))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org