You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2023/02/21 13:16:19 UTC

[ozone] branch master updated: HDDS-7989. UnhealthyReplicationProcessor retries failure without delay (#4285)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 47a68f8751 HDDS-7989. UnhealthyReplicationProcessor retries failure without delay (#4285)
47a68f8751 is described below

commit 47a68f8751404664f19be617a789d1fe84b8472a
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Tue Feb 21 14:16:12 2023 +0100

    HDDS-7989. UnhealthyReplicationProcessor retries failure without delay (#4285)
---
 .../replication/UnhealthyReplicationProcessor.java |  9 ++++-
 .../replication/TestOverReplicatedProcessor.java   | 32 +++++++++++++-----
 .../replication/TestUnderReplicatedProcessor.java  | 38 ++++++++++++++--------
 3 files changed, 56 insertions(+), 23 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
index 1289981c51..15df85b6f7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java
@@ -27,6 +27,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -80,6 +82,7 @@ public abstract class UnhealthyReplicationProcessor<HealthResult extends
     int failed = 0;
     Map<ContainerHealthResult.HealthState, Integer> healthStateCntMap =
             Maps.newHashMap();
+    List<HealthResult> failedOnes = new LinkedList<>();
     while (true) {
       if (!replicationManager.shouldRun()) {
         break;
@@ -99,9 +102,13 @@ public abstract class UnhealthyReplicationProcessor<HealthResult extends
                    "container {}", healthResult.getClass(),
                 healthResult.getContainerInfo(), e);
         failed++;
-        requeueHealthResultFromQueue(replicationManager, healthResult);
+        failedOnes.add(healthResult);
       }
     }
+
+    failedOnes.forEach(result ->
+        requeueHealthResultFromQueue(replicationManager, result));
+
     LOG.info("Processed {} containers with health state counts {}," +
              "failed processing {}", processed, healthStateCntMap, failed);
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
index 910ba75f98..af230897af 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestOverReplicatedProcessor.java
@@ -25,11 +25,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -47,6 +49,7 @@ public class TestOverReplicatedProcessor {
   private ReplicationManager replicationManager;
   private ECReplicationConfig repConfig;
   private OverReplicatedProcessor overReplicatedProcessor;
+  private ReplicationQueue queue;
 
   @Before
   public void setup() {
@@ -54,6 +57,20 @@ public class TestOverReplicatedProcessor {
     ReplicationManagerConfiguration rmConf =
         conf.getObject(ReplicationManagerConfiguration.class);
     replicationManager = Mockito.mock(ReplicationManager.class);
+
+    // use real queue
+    queue = new ReplicationQueue();
+    Mockito.when(replicationManager.dequeueOverReplicatedContainer())
+        .thenAnswer(inv -> queue.dequeueOverReplicatedContainer());
+    ArgumentCaptor<OverReplicatedHealthResult> captor =
+        ArgumentCaptor.forClass(OverReplicatedHealthResult.class);
+    Mockito.doAnswer(inv -> {
+      queue.enqueue(captor.getValue());
+      return null;
+    })
+        .when(replicationManager)
+            .requeueOverReplicatedContainer(captor.capture());
+
     repConfig = new ECReplicationConfig(3, 2);
     overReplicatedProcessor = new OverReplicatedProcessor(
         replicationManager, rmConf.getOverReplicatedInterval());
@@ -64,10 +81,8 @@ public class TestOverReplicatedProcessor {
   public void testDeleteContainerCommand() throws IOException {
     ContainerInfo container = ReplicationTestUtil
         .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
-    Mockito.when(replicationManager.dequeueOverReplicatedContainer())
-        .thenReturn(
-            new ContainerHealthResult.OverReplicatedHealthResult(container, 3,
-                false), null);
+    queue.enqueue(new OverReplicatedHealthResult(
+        container, 3, false));
     Set<Pair<DatanodeDetails, SCMCommand<?>>> commands = new HashSet<>();
     DeleteContainerCommand cmd =
         new DeleteContainerCommand(container.getContainerID());
@@ -86,14 +101,13 @@ public class TestOverReplicatedProcessor {
   public void testMessageRequeuedOnException() throws IOException {
     ContainerInfo container = ReplicationTestUtil
         .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
-    Mockito.when(replicationManager.dequeueOverReplicatedContainer())
-        .thenReturn(new ContainerHealthResult
-                .OverReplicatedHealthResult(container, 3, false),
-            null);
+    queue.enqueue(new OverReplicatedHealthResult(
+        container, 3, false));
 
     Mockito.when(replicationManager
             .processOverReplicatedContainer(any()))
-        .thenThrow(new IOException("Test Exception"));
+        .thenThrow(new IOException("Test Exception"))
+        .thenThrow(new AssertionError("Should process only one item"));
     overReplicatedProcessor.processAll();
 
     Mockito.verify(replicationManager, Mockito.times(0))
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
index 48cc5476ab..91a8c62471 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
@@ -25,12 +25,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
 import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.io.IOException;
@@ -50,6 +52,7 @@ public class TestUnderReplicatedProcessor {
   private ReplicationManager replicationManager;
   private ECReplicationConfig repConfig;
   private UnderReplicatedProcessor underReplicatedProcessor;
+  private ReplicationQueue queue;
 
   @Before
   public void setup() {
@@ -57,6 +60,20 @@ public class TestUnderReplicatedProcessor {
     ReplicationManagerConfiguration rmConf =
         conf.getObject(ReplicationManagerConfiguration.class);
     replicationManager = Mockito.mock(ReplicationManager.class);
+
+    // use real queue
+    queue = new ReplicationQueue();
+    Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
+        .thenAnswer(inv -> queue.dequeueUnderReplicatedContainer());
+    ArgumentCaptor<UnderReplicatedHealthResult> captor =
+        ArgumentCaptor.forClass(UnderReplicatedHealthResult.class);
+    Mockito.doAnswer(inv -> {
+      queue.enqueue(captor.getValue());
+      return null;
+    })
+        .when(replicationManager)
+        .requeueUnderReplicatedContainer(captor.capture());
+
     repConfig = new ECReplicationConfig(3, 2);
     underReplicatedProcessor = new UnderReplicatedProcessor(
         replicationManager, rmConf.getUnderReplicatedInterval());
@@ -69,10 +86,8 @@ public class TestUnderReplicatedProcessor {
   public void testEcReconstructionCommand() throws IOException {
     ContainerInfo container = ReplicationTestUtil
         .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
-    Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
-        .thenReturn(new ContainerHealthResult
-                .UnderReplicatedHealthResult(container, 3, false, false, false),
-            (ContainerHealthResult.UnderReplicatedHealthResult) null);
+    queue.enqueue(new UnderReplicatedHealthResult(
+        container, 3, false, false, false));
     List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
         sourceNodes = new ArrayList<>();
     for (int i = 1; i <= 3; i++) {
@@ -105,10 +120,8 @@ public class TestUnderReplicatedProcessor {
   public void testEcReplicationCommand() throws IOException {
     ContainerInfo container = ReplicationTestUtil
         .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
-    Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
-        .thenReturn(new ContainerHealthResult
-                .UnderReplicatedHealthResult(container, 3, true, false, false),
-            (ContainerHealthResult.UnderReplicatedHealthResult) null);
+    queue.enqueue(new UnderReplicatedHealthResult(
+        container, 3, true, false, false));
     List<DatanodeDetails> sourceDns = new ArrayList<>();
     sourceDns.add(MockDatanodeDetails.randomDatanodeDetails());
     DatanodeDetails targetDn = MockDatanodeDetails.randomDatanodeDetails();
@@ -134,14 +147,13 @@ public class TestUnderReplicatedProcessor {
   public void testMessageRequeuedOnException() throws IOException {
     ContainerInfo container = ReplicationTestUtil
         .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
-    Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
-        .thenReturn(new ContainerHealthResult
-                .UnderReplicatedHealthResult(container, 3, false, false, false),
-            (ContainerHealthResult.UnderReplicatedHealthResult) null);
+    queue.enqueue(new UnderReplicatedHealthResult(
+            container, 3, false, false, false));
 
     Mockito.when(replicationManager
             .processUnderReplicatedContainer(any()))
-        .thenThrow(new IOException("Test Exception"));
+        .thenThrow(new IOException("Test Exception"))
+        .thenThrow(new AssertionError("Should process only one item"));
     underReplicatedProcessor.processAll();
 
     Mockito.verify(replicationManager, Mockito.times(0))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org