You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/11/23 01:33:01 UTC
[ozone] branch master updated: HDDS-7462. EC: Fix Reconstruction Issue with StaleRecoveringContainerScrubbingService (#3939)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1e86860e1b HDDS-7462. EC: Fix Reconstruction Issue with StaleRecoveringContainerScrubbingService (#3939)
1e86860e1b is described below
commit 1e86860e1b4f36ccd98e82d9830e5034c527449c
Author: Swaminathan Balachandran <47...@users.noreply.github.com>
AuthorDate: Tue Nov 22 17:32:56 2022 -0800
HDDS-7462. EC: Fix Reconstruction Issue with StaleRecoveringContainerScrubbingService (#3939)
---
.../common/statemachine/DatanodeConfiguration.java | 6 ++
.../reconstruction/ECContainerOperationClient.java | 48 ++++-----
.../ECReconstructionCoordinator.java | 36 +++++--
.../StaleRecoveringContainerScrubbingService.java | 5 +-
...stStaleRecoveringContainerScrubbingService.java | 38 +++++--
.../scm/container/CloseContainerEventHandler.java | 5 +-
hadoop-hdds/test-utils/pom.xml | 5 +
.../org/apache/ozone/test/GenericTestUtils.java | 41 ++++++++
.../ozone/container/TestECContainerRecovery.java | 113 ++++++++++++++++++++-
9 files changed, 250 insertions(+), 47 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 642298af08..f43f952296 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -176,6 +176,12 @@ public class DatanodeConfiguration {
return Duration.ofMillis(blockDeletionInterval);
}
+ public void setRecoveringContainerScrubInterval(
+ Duration recoveringContainerScrubInterval) {
+ this.recoveringContainerScrubInterval =
+ recoveringContainerScrubInterval.toMillis();
+ }
+
public Duration getRecoveringContainerScrubInterval() {
return Duration.ofMillis(recoveringContainerScrubInterval);
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
index 074963ee96..9a0dac41ab 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECContainerOperationClient.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.ec.reconstruction;
import com.google.common.collect.ImmutableList;
+import org.apache.commons.collections.map.SingletonMap;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -32,7 +33,7 @@ import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -127,25 +130,29 @@ public class ECContainerOperationClient implements Closeable {
* @param repConfig - Replication config.
* @param encodedToken - Token
*/
- public void deleteRecoveringContainer(long containerID, DatanodeDetails dn,
- ECReplicationConfig repConfig, String encodedToken) throws IOException {
+ public void deleteContainerInState(long containerID, DatanodeDetails dn,
+ ECReplicationConfig repConfig, String encodedToken,
+ Set<State> acceptableStates) throws IOException {
XceiverClientSpi xceiverClient = this.xceiverClientManager
.acquireClient(singleNodePipeline(dn, repConfig));
try {
// Before deleting the recovering container, just make sure that state is
- // Recovering. There will be still race condition, but this will avoid
- // most usual case.
+ // Recovering & Unhealthy. There will be still race condition,
+ // but this will avoid most usual case.
ContainerProtos.ReadContainerResponseProto readContainerResponseProto =
ContainerProtocolCalls
.readContainer(xceiverClient, containerID, encodedToken);
- if (readContainerResponseProto
- .getContainerData()
- .getState() == ContainerProtos.ContainerDataProto.State.RECOVERING) {
- ContainerProtocolCalls
- .deleteContainer(xceiverClient, containerID, true, encodedToken);
+ State currentState =
+ readContainerResponseProto.getContainerData().getState();
+ if (!Objects.isNull(acceptableStates)
+ && acceptableStates.contains(currentState)) {
+ ContainerProtocolCalls.deleteContainer(xceiverClient, containerID,
+ true, encodedToken);
} else {
- LOG.warn("Container will not be deleted as it is not a recovering"
- + " container {}", containerID);
+ LOG.warn("Container {} will not be deleted as current state " +
+ "not in acceptable states. Current state: {}, " +
+ "Acceptable States: {}", containerID, currentState,
+ acceptableStates);
}
} finally {
this.xceiverClientManager.releaseClient(xceiverClient, false);
@@ -167,24 +174,19 @@ public class ECContainerOperationClient implements Closeable {
}
Pipeline singleNodePipeline(DatanodeDetails dn,
- ECReplicationConfig repConfig, int replicaIndex) {
-
- ImmutableMap<DatanodeDetails, Integer> dnIndexMap =
- ImmutableMap.of(dn, replicaIndex);
- return Pipeline.newBuilder().setId(PipelineID.valueOf(dn.getUuid()))
- .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
- .setReplicaIndexes(dnIndexMap)
- .setState(Pipeline.PipelineState.CLOSED).build();
+ ECReplicationConfig repConfig) {
+ return singleNodePipeline(dn, repConfig, 0);
}
Pipeline singleNodePipeline(DatanodeDetails dn,
- ECReplicationConfig repConfig) {
+ ECReplicationConfig repConfig, int replicaIndex) {
// To get the same client from cache, we try to use the DN UUID as
// pipelineID for uniqueness. Please note, pipeline does not have any
// significance after it's close. So, we are ok to use any ID.
return Pipeline.newBuilder().setId(PipelineID.valueOf(dn.getUuid()))
- .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
- .setState(Pipeline.PipelineState.CLOSED).build();
+ .setReplicationConfig(repConfig).setNodes(ImmutableList.of(dn))
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setReplicaIndexes(new SingletonMap(dn, replicaIndex)).build();
}
public XceiverClientManager getXceiverClientManager() {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
index d5cd2c874e..d151579562 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ec/reconstruction/ECReconstructionCoordinator.java
@@ -17,7 +17,9 @@
*/
package org.apache.hadoop.ozone.container.ec.reconstruction;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
@@ -183,8 +185,10 @@ public class ECReconstructionCoordinator implements Closeable {
for (DatanodeDetails dn : recoveringContainersCreatedDNs) {
try {
containerOperationClient
- .deleteRecoveringContainer(containerID, dn, repConfig,
- containerToken);
+ .deleteContainerInState(containerID, dn, repConfig,
+ containerToken, ImmutableSet.of(
+ ContainerProtos.ContainerDataProto.State.UNHEALTHY,
+ ContainerProtos.ContainerDataProto.State.RECOVERING));
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted the container {}, at the target: {}",
containerID, dn);
@@ -199,7 +203,20 @@ public class ECReconstructionCoordinator implements Closeable {
}
- void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
+ ECBlockOutputStream getECBlockOutputstream(
+ BlockLocationInfo blockLocationInfo, DatanodeDetails datanodeDetails,
+ ECReplicationConfig repConfig, int replicaIndex, BufferPool bufferPool,
+ OzoneClientConfig configuration) throws IOException {
+ return new ECBlockOutputStream(blockLocationInfo.getBlockID(),
+ this.containerOperationClient.getXceiverClientManager(),
+ this.containerOperationClient
+ .singleNodePipeline(datanodeDetails, repConfig,
+ replicaIndex), bufferPool, configuration,
+ blockLocationInfo.getToken(), clientMetrics);
+ }
+
+ @VisibleForTesting
+ public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
ECReplicationConfig repConfig,
SortedMap<Integer, DatanodeDetails> targetMap, BlockData[] blockDataGroup)
throws IOException {
@@ -249,15 +266,12 @@ public class ECReconstructionCoordinator implements Closeable {
.getStreamBufferSize()),
ByteStringConversion.createByteBufferConversion(false));
for (int i = 0; i < toReconstructIndexes.size(); i++) {
+ int replicaIndex = toReconstructIndexes.get(i);
DatanodeDetails datanodeDetails =
- targetMap.get(toReconstructIndexes.get(i));
- targetBlockStreams[i] =
- new ECBlockOutputStream(blockLocationInfo.getBlockID(),
- this.containerOperationClient.getXceiverClientManager(),
- this.containerOperationClient.
- singleNodePipeline(datanodeDetails, repConfig,
- toReconstructIndexes.get(i)), bufferPool,
- configuration, blockLocationInfo.getToken(), clientMetrics);
+ targetMap.get(replicaIndex);
+ targetBlockStreams[i] = getECBlockOutputstream(blockLocationInfo,
+ datanodeDetails, repConfig, replicaIndex, bufferPool,
+ configuration);
bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
// Make sure it's clean. Don't want to reuse the erroneously returned
// buffers from the pool.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java
index b73bc5a435..d78fe8eef5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/StaleRecoveringContainerScrubbingService.java
@@ -84,9 +84,8 @@ public class StaleRecoveringContainerScrubbingService
@Override
public BackgroundTaskResult call() throws Exception {
- containerSet.getContainer(containerID).delete();
- containerSet.removeContainer(containerID);
- LOG.info("Delete stale recovering container {}", containerID);
+ containerSet.getContainer(containerID).markContainerUnhealthy();
+ LOG.info("Stale recovering container {} marked UNHEALTHY", containerID);
return new BackgroundTaskResult.EmptyTaskResult();
}
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestStaleRecoveringContainerScrubbingService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestStaleRecoveringContainerScrubbingService.java
index fea6c776db..1fb57e4a4c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestStaleRecoveringContainerScrubbingService.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestStaleRecoveringContainerScrubbingService.java
@@ -50,12 +50,18 @@ import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.RECOVERING;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
@@ -120,10 +126,11 @@ public class TestStaleRecoveringContainerScrubbingService {
/**
* A helper method to create a number of containers of given state.
*/
- private void createTestContainers(
+ private List<Long> createTestContainers(
ContainerSet containerSet, int num,
ContainerProtos.ContainerDataProto.State state)
throws StorageContainerException {
+ List<Long> createdIds = new ArrayList<>();
int end = containerIdNum + num;
for (; containerIdNum < end; containerIdNum++) {
testClock.fastForward(10L);
@@ -139,7 +146,9 @@ public class TestStaleRecoveringContainerScrubbingService {
recoveringKeyValueContainer.create(
volumeSet, volumeChoosingPolicy, clusterID);
containerSet.addContainer(recoveringKeyValueContainer);
+ createdIds.add((long) containerIdNum);
}
+ return createdIds;
}
@Test
@@ -153,29 +162,44 @@ public class TestStaleRecoveringContainerScrubbingService {
Duration.ofSeconds(300).toMillis(),
containerSet);
testClock.fastForward(1000L);
- createTestContainers(containerSet, 5, CLOSED);
+ Map<Long, ContainerProtos.ContainerDataProto.State> containerStateMap =
+ new HashMap<>();
+ containerStateMap.putAll(createTestContainers(containerSet, 5, CLOSED)
+ .stream().collect(Collectors.toMap(i -> i, i -> CLOSED)));
+
testClock.fastForward(1000L);
srcss.runPeriodicalTaskNow();
//closed container should not be scrubbed
Assert.assertTrue(containerSet.containerCount() == 5);
- createTestContainers(containerSet, 5, RECOVERING);
+ containerStateMap.putAll(createTestContainers(containerSet, 5,
+ RECOVERING).stream()
+ .collect(Collectors.toMap(i -> i, i -> UNHEALTHY)));
testClock.fastForward(1000L);
srcss.runPeriodicalTaskNow();
//recovering container should be scrubbed since recovering timeout
- Assert.assertTrue(containerSet.containerCount() == 5);
+ Assert.assertTrue(containerSet.containerCount() == 10);
Iterator<Container<?>> it = containerSet.getContainerIterator();
while (it.hasNext()) {
Container<?> entry = it.next();
- Assert.assertTrue(entry.getContainerState().equals(CLOSED));
+ Assert.assertEquals(entry.getContainerState(),
+ containerStateMap.get(entry.getContainerData().getContainerID()));
}
//increase recovering timeout
containerSet.setRecoveringTimeout(2000L);
- createTestContainers(containerSet, 5, RECOVERING);
+ containerStateMap.putAll(createTestContainers(containerSet, 5,
+ RECOVERING).stream()
+ .collect(Collectors.toMap(i -> i, i -> RECOVERING)));
testClock.fastForward(1000L);
srcss.runPeriodicalTaskNow();
//recovering container should not be scrubbed
- Assert.assertTrue(containerSet.containerCount() == 10);
+ Assert.assertTrue(containerSet.containerCount() == 15);
+ it = containerSet.getContainerIterator();
+ while (it.hasNext()) {
+ Container<?> entry = it.next();
+ Assert.assertEquals(entry.getContainerState(),
+ containerStateMap.get(entry.getContainerData().getContainerID()));
+ }
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 2ff9b79ce6..a2afb10c55 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -68,8 +68,11 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
@Override
public void onMessage(ContainerID containerID, EventPublisher publisher) {
- LOG.info("Close container Event triggered for container : {}", containerID);
+
try {
+ LOG.info("Close container Event triggered for container : {}, " +
+ "current state: {}", containerID,
+ containerManager.getContainer(containerID).getState());
// If the container is in OPEN state, FINALIZE it.
if (containerManager.getContainer(containerID).getState()
== LifeCycleState.OPEN) {
diff --git a/hadoop-hdds/test-utils/pom.xml b/hadoop-hdds/test-utils/pom.xml
index 9e7674586a..d4d60605a0 100644
--- a/hadoop-hdds/test-utils/pom.xml
+++ b/hadoop-hdds/test-utils/pom.xml
@@ -77,6 +77,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<version>0.8.5</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
index 46f371e6ce..e03f0a7ffe 100644
--- a/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
+++ b/hadoop-hdds/test-utils/src/main/java/org/apache/ozone/test/GenericTestUtils.java
@@ -40,6 +40,9 @@ import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.log4j.WriterAppender;
import org.junit.Assert;
+import org.mockito.Mockito;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertTrue;
@@ -242,6 +245,44 @@ public abstract class GenericTestUtils {
setLogLevel(LogManager.getRootLogger(), Level.toLevel(level.toString()));
}
+ public static <T> T mockFieldReflection(Object object, String fieldName)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field = object.getClass().getDeclaredField(fieldName);
+ boolean isAccessible = field.isAccessible();
+
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ boolean modifierFieldAccessible = modifiersField.isAccessible();
+ modifiersField.setAccessible(true);
+ int modifierVal = modifiersField.getInt(field);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ T value = (T) field.get(object);
+ value = Mockito.spy(value);
+ field.set(object, value);
+ modifiersField.setInt(field, modifierVal);
+ modifiersField.setAccessible(modifierFieldAccessible);
+ field.setAccessible(isAccessible);
+ return value;
+ }
+
+ public static <T> T getFieldReflection(Object object, String fieldName)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field field = object.getClass().getDeclaredField(fieldName);
+ boolean isAccessible = field.isAccessible();
+
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ boolean modifierFieldAccessible = modifiersField.isAccessible();
+ modifiersField.setAccessible(true);
+ int modifierVal = modifiersField.getInt(field);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ T value = (T) field.get(object);
+ modifiersField.setInt(field, modifierVal);
+ modifiersField.setAccessible(modifierFieldAccessible);
+ field.setAccessible(isAccessible);
+ return value;
+ }
+
/**
* Class to capture logs for doing assertions.
*/
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
index 764213c169..cad98924c1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestECContainerRecovery.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.BucketArgs;
@@ -44,10 +45,16 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
+import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionSupervisor;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
@@ -55,13 +62,17 @@ import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_RECOVERING_CONTAINER_TIMEOUT_DEFAULT;
/**
* Tests the EC recovery and over replication processing.
@@ -80,6 +91,8 @@ public class TestECContainerRecovery {
private static int dataBlocks = 3;
private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestECContainerRecovery.class);
/**
* Create a MiniDFSCluster for testing.
*/
@@ -94,7 +107,11 @@ public class TestECContainerRecovery {
clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
clientConfig.setStreamBufferFlushDelay(false);
conf.setFromObject(clientConfig);
-
+ DatanodeConfiguration datanodeConfiguration =
+ conf.getObject(DatanodeConfiguration.class);
+ datanodeConfiguration.setRecoveringContainerScrubInterval(
+ Duration.of(10, ChronoUnit.SECONDS));
+ conf.setFromObject(datanodeConfiguration);
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
ReplicationManager.ReplicationManagerConfiguration rmConfig = conf
.getObject(
@@ -204,7 +221,7 @@ public class TestECContainerRecovery {
}
StorageContainerManager scm = cluster.getStorageContainerManager();
- // Shutting sown DN triggers close pipeline and close container.
+ // Shutting down DN triggers close pipeline and close container.
cluster.shutdownHddsDatanode(pipeline.getFirstNode());
// Make sure container closed.
@@ -239,6 +256,98 @@ public class TestECContainerRecovery {
waitForContainerCount(5, container.containerID(), scm);
}
+ @Test
+ public void testECContainerRecoveryWithTimedOutRecovery() throws Exception {
+ byte[] inputData = getInputBytes(3);
+ final OzoneBucket bucket = getOzoneBucket();
+ String keyName = UUID.randomUUID().toString();
+ final Pipeline pipeline;
+ ECReplicationConfig repConfig =
+ new ECReplicationConfig(3, 2,
+ ECReplicationConfig.EcCodec.RS, chunkSize);
+ try (OzoneOutputStream out = bucket
+ .createKey(keyName, 1024, repConfig, new HashMap<>())) {
+ out.write(inputData);
+ pipeline = ((ECKeyOutputStream) out.getOutputStream())
+ .getStreamEntries().get(0).getPipeline();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<ContainerInfo> containers =
+ cluster.getStorageContainerManager().getContainerManager()
+ .getContainers();
+ ContainerInfo container = null;
+ for (ContainerInfo info : containers) {
+ if (info.getPipelineID().getId().equals(pipeline.getId().getId())) {
+ container = info;
+ }
+ }
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ AtomicReference<HddsDatanodeService> reconstructedDN =
+ new AtomicReference<>();
+ ContainerInfo finalContainer = container;
+ Map<HddsDatanodeService, Long> recoveryTimeoutMap = new HashMap<>();
+ for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+ recoveryTimeoutMap.put(dn, dn.getDatanodeStateMachine().getConf()
+ .getTimeDuration(OZONE_RECOVERING_CONTAINER_TIMEOUT,
+ OZONE_RECOVERING_CONTAINER_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS));
+ dn.getDatanodeStateMachine().getContainer()
+ .getContainerSet().setRecoveringTimeout(100);
+
+ ECReconstructionSupervisor ecReconstructionSupervisor =
+ GenericTestUtils.getFieldReflection(dn.getDatanodeStateMachine(),
+ "ecReconstructionSupervisor");
+ ECReconstructionCoordinator coordinator = GenericTestUtils
+ .mockFieldReflection(ecReconstructionSupervisor,
+ "reconstructionCoordinator");
+
+ Mockito.doAnswer(invocation -> {
+ GenericTestUtils.waitFor(() ->
+ dn.getDatanodeStateMachine()
+ .getContainer()
+ .getContainerSet()
+ .getContainer(finalContainer.getContainerID())
+ .getContainerState() ==
+ ContainerProtos.ContainerDataProto.State.UNHEALTHY,
+ 1000, 100000);
+ reconstructedDN.set(dn);
+ invocation.callRealMethod();
+ return null;
+ }).when(coordinator).reconstructECBlockGroup(Mockito.any(), Mockito.any(),
+ Mockito.any(), Mockito.any());
+ }
+
+ // Shutting down DN triggers close pipeline and close container.
+ cluster.shutdownHddsDatanode(pipeline.getFirstNode());
+
+
+
+ // Make sure container closed.
+ waitForSCMContainerState(StorageContainerDatanodeProtocolProtos
+ .ContainerReplicaProto.State.CLOSED, container.containerID());
+ //Temporarily stop the RM process.
+ scm.getReplicationManager().stop();
+
+ // Wait for the lower replication.
+ waitForContainerCount(4, container.containerID(), scm);
+
+ // Start the RM to resume the replication process and wait for the
+ // reconstruction.
+ scm.getReplicationManager().start();
+ GenericTestUtils.waitFor(() -> reconstructedDN.get() != null, 10000,
+ 100000);
+ GenericTestUtils.waitFor(() -> reconstructedDN.get()
+ .getDatanodeStateMachine().getContainer().getContainerSet()
+ .getContainer(finalContainer.getContainerID()) == null,
+ 10000, 100000);
+ for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
+ dn.getDatanodeStateMachine().getContainer().getContainerSet()
+ .setRecoveringTimeout(recoveryTimeoutMap.get(dn));
+ }
+ }
+
private void waitForDNContainerState(ContainerInfo container,
StorageContainerManager scm) throws InterruptedException,
TimeoutException {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org