You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/11/23 01:33:01 UTC

[ozone] branch master updated: HDDS-7462. EC: Fix Reconstruction Issue with StaleRecoveringContainerScrubbingService (#3939)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e86860e1b HDDS-7462. EC: Fix Reconstruction Issue with StaleRecoveringContainerScrubbingService (#3939)
1e86860e1b is described below

commit 1e86860e1b4f36ccd98e82d9830e5034c527449c
Author: Swaminathan Balachandran <47...@users.noreply.github.com>
AuthorDate: Tue Nov 22 17:32:56 2022 -0800

    HDDS-7462. EC: Fix Reconstruction Issue with StaleRecoveringContainerScrubbingService (#3939)
---
 .../common/statemachine/DatanodeConfiguration.java |   6 ++
 .../reconstruction/ECContainerOperationClient.java |  48 ++++-----
 .../ECReconstructionCoordinator.java               |  36 +++++--
 .../StaleRecoveringContainerScrubbingService.java  |   5 +-
 ...stStaleRecoveringContainerScrubbingService.java |  38 +++++--
 .../scm/container/CloseContainerEventHandler.java  |   5 +-
 hadoop-hdds/test-utils/pom.xml                     |   5 +
 .../org/apache/ozone/test/GenericTestUtils.java    |  41 ++++++++
 .../ozone/container/TestECContainerRecovery.java   | 113 ++++++++++++++++++++-
 9 files changed, 250 insertions(+), 47 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 642298af08..f43f952296 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -176,6 +176,12 @@ public class DatanodeConfiguration {
     return Duration.ofMillis(blockDeletionInterval);
   }
 
+  public void setRecoveringContainerScrubInterval(
+          Duration recoveringContainerScrubInterval) {
+    this.recoveringContainerScrubInterval =
+            recoveringContainerScrubInterval.toMillis();
+  }
+
   public Duration getRecoveringContainerScrubInterval() {
     return Duration.ofMillis(recoveringContainerScrubInterval);
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
index 074963ee96..9a0dac41ab 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.container.ec.reconstruction;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.commons.collections.map.SingletonMap;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -32,7 +33,7 @@ import org.apache.hadoop.hdds.utils.HAUtils;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -127,25 +130,29 @@ public class ECContainerOperationClient implements Closeable {
    * @param repConfig - Replication config.
    * @param encodedToken - Token
    */
-  public void deleteRecoveringContainer(long containerID, DatanodeDetails dn,
-      ECReplicationConfig repConfig, String encodedToken) throws IOException {
+  public void deleteContainerInState(long containerID, DatanodeDetails dn,
+         ECReplicationConfig repConfig, String encodedToken,
+         Set<State> acceptableStates) throws IOException {
     XceiverClientSpi xceiverClient = this.xceiverClientManager
         .acquireClient(singleNodePipeline(dn, repConfig));
     try {
       // Before deleting the recovering container, just make sure that state is
-      // Recovering. There will be still race condition, but this will avoid
-      // most usual case.
+      // Recovering & Unhealthy. There will be still race condition,
+      // but this will avoid most usual case.
       ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
           ContainerProtocolCalls
               .readContainer(xceiverClient, containerID, encodedToken);
-      if (readContainerResponseProto
-          .getContainerData()
-          .getState() == ContainerProtos.ContainerDataProto.State.RECOVERING) {
-        ContainerProtocolCalls
-            .deleteContainer(xceiverClient, containerID, true, encodedToken);
+      State currentState =
+              readContainerResponseProto.getContainerData().getState();
+      if (!Objects.isNull(acceptableStates)
+              && acceptableStates.contains(currentState)) {
+        ContainerProtocolCalls.deleteContainer(xceiverClient, containerID,
+                        true, encodedToken);
       } else {
-        LOG.warn("Container will not be deleted as it is not a recovering"
-            + " container {}", containerID);
+        LOG.warn("Container {} will not be deleted as current state " +
+                "not in acceptable states. Current state: {}, " +
+                "Acceptable States: {}", containerID, currentState,
+                acceptableStates);
       }
     } finally {
       this.xceiverClientManager.releaseClient(xceiverClient, false);
@@ -167,24 +174,19 @@ public class ECContainerOperationClient implements Closeable {
   }
 
   Pipeline singleNodePipeline(DatanodeDetails dn,
-      ECReplicationConfig repConfig, int replicaIndex) {
-
-    ImmutableMap<DatanodeDetails, Integer> dnIndexMap =
-        ImmutableMap.of(dn, replicaIndex);
-    return Pipeline.newBuilder().setId(PipelineID.valueOf(dn.getUuid()))
-        .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
-        .setReplicaIndexes(dnIndexMap)
-        .setState(Pipeline.PipelineState.CLOSED).build();
+      ECReplicationConfig repConfig) {
+    return singleNodePipeline(dn, repConfig, 0);
   }
 
   Pipeline singleNodePipeline(DatanodeDetails dn,
-      ECReplicationConfig repConfig) {
+       ECReplicationConfig repConfig, int replicaIndex) {
     // To get the same client from cache, we try to use the DN UUID as
     // pipelineID for uniqueness. Please note, pipeline does not have any
     // significance after it's close. So, we are ok to use any ID.
     return Pipeline.newBuilder().setId(PipelineID.valueOf(dn.getUuid()))
-        .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
-        .setState(Pipeline.PipelineState.CLOSED).build();
+            .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
+            .setState(Pipeline.PipelineState.CLOSED)
+            .setReplicaIndexes(new SingletonMap(dn, replicaIndex)).build();
   }
 
   public XceiverClientManager getXceiverClientManager() {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index d5cd2c874e..d151579562 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -17,7 +17,9 @@
  */
 package org.apache.hadoop.ozone.container.ec.reconstruction;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -183,8 +185,10 @@ public class ECReconstructionCoordinator implements Closeable {
       for (DatanodeDetails dn : recoveringContainersCreatedDNs) {
         try {
           containerOperationClient
-              .deleteRecoveringContainer(containerID, dn, repConfig,
-                  containerToken);
+              .deleteContainerInState(containerID, dn, repConfig,
+                  containerToken, ImmutableSet.of(
+                          ContainerProtos.ContainerDataProto.State.UNHEALTHY,
+                          ContainerProtos.ContainerDataProto.State.RECOVERING));
           if (LOG.isDebugEnabled()) {
             LOG.debug("Deleted the container {}, at the target: {}",
                 containerID, dn);
@@ -199,7 +203,20 @@ public class ECReconstructionCoordinator implements Closeable {
 
   }
 
-  void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
+  ECBlockOutputStream getECBlockOutputstream(
+      BlockLocationInfo blockLocationInfo, DatanodeDetails datanodeDetails,
+      ECReplicationConfig repConfig, int replicaIndex, BufferPool bufferPool,
+      OzoneClientConfig configuration) throws IOException {
+    return new ECBlockOutputStream(blockLocationInfo.getBlockID(),
+            this.containerOperationClient.getXceiverClientManager(),
+            this.containerOperationClient
+                    .singleNodePipeline(datanodeDetails, repConfig,
+                            replicaIndex), bufferPool, configuration,
+            blockLocationInfo.getToken(), clientMetrics);
+  }
+
+  @VisibleForTesting
+  public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
       ECReplicationConfig repConfig,
       SortedMap<Integer, DatanodeDetails> targetMap, BlockData[] blockDataGroup)
       throws IOException {
@@ -249,15 +266,12 @@ public class ECReconstructionCoordinator implements Closeable {
                   .getStreamBufferSize()),
               ByteStringConversion.createByteBufferConversion(false));
       for (int i = 0; i < toReconstructIndexes.size(); i++) {
+        int replicaIndex = toReconstructIndexes.get(i);
         DatanodeDetails datanodeDetails =
-            targetMap.get(toReconstructIndexes.get(i));
-        targetBlockStreams[i] =
-            new ECBlockOutputStream(blockLocationInfo.getBlockID(),
-                this.containerOperationClient.getXceiverClientManager(),
-                this.containerOperationClient.
-                    singleNodePipeline(datanodeDetails, repConfig,
-                    toReconstructIndexes.get(i)), bufferPool,
-                configuration, blockLocationInfo.getToken(), clientMetrics);
+            targetMap.get(replicaIndex);
+        targetBlockStreams[i] = getECBlockOutputstream(blockLocationInfo,
+                datanodeDetails, repConfig, replicaIndex, bufferPool,
+                configuration);
         bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
         // Make sure it's clean. Don't want to reuse the erroneously returned
         // buffers from the pool.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java
index b73bc5a435..d78fe8eef5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java
@@ -84,9 +84,8 @@ public class StaleRecoveringContainerScrubbingService
 
     @Override
     public BackgroundTaskResult call() throws Exception {
-      containerSet.getContainer(containerID).delete();
-      containerSet.removeContainer(containerID);
-      LOG.info("Delete stale recovering container {}", containerID);
+      containerSet.getContainer(containerID).markContainerUnhealthy();
+      LOG.info("Stale recovering container {} marked UNHEALTHY", containerID);
       return new BackgroundTaskResult.EmptyTaskResult();
     }
   }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestStaleRecoveringContainerScrubbingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestStaleRecoveringContainerScrubbingService.java
index fea6c776db..1fb57e4a4c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestStaleRecoveringContainerScrubbingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestStaleRecoveringContainerScrubbingService.java
@@ -50,12 +50,18 @@ import java.io.IOException;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -120,10 +126,11 @@ public class TestStaleRecoveringContainerScrubbingService {
   /**
    * A helper method to create a number of containers of given state.
    */
-  private void createTestContainers(
+  private List<Long> createTestContainers(
       ContainerSet containerSet, int num,
       ContainerProtos.ContainerDataProto.State state)
       throws StorageContainerException {
+    List<Long> createdIds = new ArrayList<>();
     int end = containerIdNum + num;
     for (; containerIdNum < end; containerIdNum++) {
       testClock.fastForward(10L);
@@ -139,7 +146,9 @@ public class TestStaleRecoveringContainerScrubbingService {
       recoveringKeyValueContainer.create(
           volumeSet, volumeChoosingPolicy, clusterID);
       containerSet.addContainer(recoveringKeyValueContainer);
+      createdIds.add((long) containerIdNum);
     }
+    return createdIds;
   }
 
   @Test
@@ -153,29 +162,44 @@ public class TestStaleRecoveringContainerScrubbingService {
             Duration.ofSeconds(300).toMillis(),
             containerSet);
     testClock.fastForward(1000L);
-    createTestContainers(containerSet, 5, CLOSED);
+    Map<Long, ContainerProtos.ContainerDataProto.State> containerStateMap =
+            new HashMap<>();
+    containerStateMap.putAll(createTestContainers(containerSet, 5, CLOSED)
+            .stream().collect(Collectors.toMap(i -> i, i -> CLOSED)));
+
     testClock.fastForward(1000L);
     srcss.runPeriodicalTaskNow();
     //closed container should not be scrubbed
     Assert.assertTrue(containerSet.containerCount() == 5);
 
-    createTestContainers(containerSet, 5, RECOVERING);
+    containerStateMap.putAll(createTestContainers(containerSet, 5,
+            RECOVERING).stream()
+            .collect(Collectors.toMap(i -> i, i -> UNHEALTHY)));
     testClock.fastForward(1000L);
     srcss.runPeriodicalTaskNow();
     //recovering container should be scrubbed since recovering timeout
-    Assert.assertTrue(containerSet.containerCount() == 5);
+    Assert.assertTrue(containerSet.containerCount() == 10);
     Iterator<Container<?>> it = containerSet.getContainerIterator();
     while (it.hasNext()) {
       Container<?> entry = it.next();
-      Assert.assertTrue(entry.getContainerState().equals(CLOSED));
+      Assert.assertEquals(entry.getContainerState(),
+              containerStateMap.get(entry.getContainerData().getContainerID()));
     }
 
     //increase recovering timeout
     containerSet.setRecoveringTimeout(2000L);
-    createTestContainers(containerSet, 5, RECOVERING);
+    containerStateMap.putAll(createTestContainers(containerSet, 5,
+            RECOVERING).stream()
+            .collect(Collectors.toMap(i -> i, i -> RECOVERING)));
     testClock.fastForward(1000L);
     srcss.runPeriodicalTaskNow();
     //recovering container should not be scrubbed
-    Assert.assertTrue(containerSet.containerCount() == 10);
+    Assert.assertTrue(containerSet.containerCount() == 15);
+    it = containerSet.getContainerIterator();
+    while (it.hasNext()) {
+      Container<?> entry = it.next();
+      Assert.assertEquals(entry.getContainerState(),
+              containerStateMap.get(entry.getContainerData().getContainerID()));
+    }
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 2ff9b79ce6..a2afb10c55 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -68,8 +68,11 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
 
   @Override
   public void onMessage(ContainerID containerID, EventPublisher publisher) {
-    LOG.info("Close container Event triggered for container : {}", containerID);
+
     try {
+      LOG.info("Close container Event triggered for container : {}, " +
+              "current state: {}", containerID,
+              containerManager.getContainer(containerID).getState());
       // If the container is in OPEN state, FINALIZE it.
       if (containerManager.getContainer(containerID).getState()
           == LifeCycleState.OPEN) {
diff --git a/hadoop-hdds/test-utils/pom.xml b/hadoop-hdds/test-utils/pom.xml
index 9e7674586a..d4d60605a0 100644
--- a/hadoop-hdds/test-utils/pom.xml
+++ b/hadoop-hdds/test-utils/pom.xml
@@ -77,6 +77,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <version>0.8.5</version>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
index 46f371e6ce..e03f0a7ffe 100644
--- a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
+++ b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
@@ -40,6 +40,9 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.apache.log4j.WriterAppender;
 import org.junit.Assert;
+import org.mockito.Mockito;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertTrue;
@@ -242,6 +245,44 @@ public abstract class GenericTestUtils {
     setLogLevel(LogManager.getRootLogger(), Level.toLevel(level.toString()));
   }
 
+  public static <T> T mockFieldReflection(Object object, String fieldName)
+          throws NoSuchFieldException, IllegalAccessException {
+    Field field = object.getClass().getDeclaredField(fieldName);
+    boolean isAccessible = field.isAccessible();
+
+    field.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    boolean modifierFieldAccessible = modifiersField.isAccessible();
+    modifiersField.setAccessible(true);
+    int modifierVal = modifiersField.getInt(field);
+    modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+    T value = (T) field.get(object);
+    value = Mockito.spy(value);
+    field.set(object, value);
+    modifiersField.setInt(field, modifierVal);
+    modifiersField.setAccessible(modifierFieldAccessible);
+    field.setAccessible(isAccessible);
+    return value;
+  }
+
+  public static <T> T getFieldReflection(Object object, String fieldName)
+          throws NoSuchFieldException, IllegalAccessException {
+    Field field = object.getClass().getDeclaredField(fieldName);
+    boolean isAccessible = field.isAccessible();
+
+    field.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    boolean modifierFieldAccessible = modifiersField.isAccessible();
+    modifiersField.setAccessible(true);
+    int modifierVal = modifiersField.getInt(field);
+    modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+    T value = (T) field.get(object);
+    modifiersField.setInt(field, modifierVal);
+    modifiersField.setAccessible(modifierFieldAccessible);
+    field.setAccessible(isAccessible);
+    return value;
+  }
+
   /**
    * Class to capture logs for doing assertions.
    */
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
index 764213c169..cad98924c1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.BucketArgs;
@@ -44,10 +45,16 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionSupervisor;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -55,13 +62,17 @@ import java.time.temporal.ChronoUnit;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_TIMEOUT_DEFAULT;
 
 /**
  * Tests the EC recovery and over replication processing.
@@ -80,6 +91,8 @@ public class TestECContainerRecovery {
   private static int dataBlocks = 3;
   private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
 
+  private static final Logger LOG =
+          LoggerFactory.getLogger(TestECContainerRecovery.class);
   /**
    * Create a MiniDFSCluster for testing.
    */
@@ -94,7 +107,11 @@ public class TestECContainerRecovery {
     clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
     clientConfig.setStreamBufferFlushDelay(false);
     conf.setFromObject(clientConfig);
-
+    DatanodeConfiguration datanodeConfiguration =
+            conf.getObject(DatanodeConfiguration.class);
+    datanodeConfiguration.setRecoveringContainerScrubInterval(
+            Duration.of(10, ChronoUnit.SECONDS));
+    conf.setFromObject(datanodeConfiguration);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     ReplicationManager.ReplicationManagerConfiguration rmConfig = conf
             .getObject(
@@ -204,7 +221,7 @@ public class TestECContainerRecovery {
     }
     StorageContainerManager scm = cluster.getStorageContainerManager();
 
-    // Shutting sown DN triggers close pipeline and close container.
+    // Shutting down DN triggers close pipeline and close container.
     cluster.shutdownHddsDatanode(pipeline.getFirstNode());
 
     // Make sure container closed.
@@ -239,6 +256,98 @@ public class TestECContainerRecovery {
     waitForContainerCount(5, container.containerID(), scm);
   }
 
+  @Test
+  public void testECContainerRecoveryWithTimedOutRecovery() throws Exception {
+    byte[] inputData = getInputBytes(3);
+    final OzoneBucket bucket = getOzoneBucket();
+    String keyName = UUID.randomUUID().toString();
+    final Pipeline pipeline;
+    ECReplicationConfig repConfig =
+            new ECReplicationConfig(3, 2,
+                    ECReplicationConfig.EcCodec.RS, chunkSize);
+    try (OzoneOutputStream out = bucket
+            .createKey(keyName, 1024, repConfig, new HashMap<>())) {
+      out.write(inputData);
+      pipeline = ((ECKeyOutputStream) out.getOutputStream())
+              .getStreamEntries().get(0).getPipeline();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    List<ContainerInfo> containers =
+            cluster.getStorageContainerManager().getContainerManager()
+                    .getContainers();
+    ContainerInfo container = null;
+    for (ContainerInfo info : containers) {
+      if (info.getPipelineID().getId().equals(pipeline.getId().getId())) {
+        container = info;
+      }
+    }
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    AtomicReference<HddsDatanodeService> reconstructedDN =
+            new AtomicReference<>();
+    ContainerInfo finalContainer = container;
+    Map<HddsDatanodeService, Long> recoveryTimeoutMap = new HashMap<>();
+    for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+      recoveryTimeoutMap.put(dn, dn.getDatanodeStateMachine().getConf()
+              .getTimeDuration(OZONE_RECOVERING_CONTAINER_TIMEOUT,
+              OZONE_RECOVERING_CONTAINER_TIMEOUT_DEFAULT,
+              TimeUnit.MILLISECONDS));
+      dn.getDatanodeStateMachine().getContainer()
+              .getContainerSet().setRecoveringTimeout(100);
+
+      ECReconstructionSupervisor ecReconstructionSupervisor =
+              GenericTestUtils.getFieldReflection(dn.getDatanodeStateMachine(),
+                      "ecReconstructionSupervisor");
+      ECReconstructionCoordinator coordinator = GenericTestUtils
+              .mockFieldReflection(ecReconstructionSupervisor,
+                      "reconstructionCoordinator");
+
+      Mockito.doAnswer(invocation -> {
+        GenericTestUtils.waitFor(() ->
+                        dn.getDatanodeStateMachine()
+                                .getContainer()
+                                .getContainerSet()
+                                .getContainer(finalContainer.getContainerID())
+                                .getContainerState() ==
+                        ContainerProtos.ContainerDataProto.State.UNHEALTHY,
+                1000, 100000);
+        reconstructedDN.set(dn);
+        invocation.callRealMethod();
+        return null;
+      }).when(coordinator).reconstructECBlockGroup(Mockito.any(), Mockito.any(),
+              Mockito.any(), Mockito.any());
+    }
+
+    // Shutting down DN triggers close pipeline and close container.
+    cluster.shutdownHddsDatanode(pipeline.getFirstNode());
+
+
+
+    // Make sure container closed.
+    waitForSCMContainerState(StorageContainerDatanodeProtocolProtos
+            .ContainerReplicaProto.State.CLOSED, container.containerID());
+    //Temporarily stop the RM process.
+    scm.getReplicationManager().stop();
+
+    // Wait for the lower replication.
+    waitForContainerCount(4, container.containerID(), scm);
+
+    // Start the RM to resume the replication process and wait for the
+    // reconstruction.
+    scm.getReplicationManager().start();
+    GenericTestUtils.waitFor(() -> reconstructedDN.get() != null, 10000,
+            100000);
+    GenericTestUtils.waitFor(() -> reconstructedDN.get()
+            .getDatanodeStateMachine().getContainer().getContainerSet()
+            .getContainer(finalContainer.getContainerID()) == null,
+            10000, 100000);
+    for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+      dn.getDatanodeStateMachine().getContainer().getContainerSet()
+              .setRecoveringTimeout(recoveryTimeoutMap.get(dn));
+    }
+  }
+
   private void waitForDNContainerState(ContainerInfo container,
       StorageContainerManager scm) throws InterruptedException,
       TimeoutException {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org