You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2021/11/03 16:23:38 UTC
[ozone] branch master updated: HDDS-5864. Retry when DN connection
issue during getBlock/ReadChunk call during Ozone key Read (#2746)
This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 73a081a HDDS-5864. Retry when DN connection issue during getBlock/ReadChunk call during Ozone key Read (#2746)
73a081a is described below
commit 73a081aaf38e43a37e3279d3ac369b4896b17a4e
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Wed Nov 3 09:23:10 2021 -0700
HDDS-5864. Retry when DN connection issue during getBlock/ReadChunk call during Ozone key Read (#2746)
---
.../hadoop/hdds/scm/storage/BlockInputStream.java | 52 ++++++++++++++++++----
.../hadoop/hdds/scm/storage/ChunkInputStream.java | 2 +-
.../storage/DummyBlockInputStreamWithRetry.java | 14 ++++--
.../hdds/scm/storage/TestBlockInputStream.java | 32 ++++++++++++-
.../hdds/scm/storage/ContainerProtocolCalls.java | 5 ---
5 files changed, 87 insertions(+), 18 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index def810f..7475db2 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -42,9 +42,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.token.Token;
@@ -75,6 +75,7 @@ public class BlockInputStream extends InputStream
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
private boolean initialized = false;
+ // TODO: do we need to change retrypolicy based on exception.
private final RetryPolicy retryPolicy =
HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1));
private int retries;
@@ -142,15 +143,34 @@ public class BlockInputStream extends InputStream
return;
}
- List<ChunkInfo> chunks;
- try {
- chunks = getChunkInfos();
- } catch (ContainerNotFoundException ioEx) {
- refreshPipeline(ioEx);
- chunks = getChunkInfos();
+ List<ChunkInfo> chunks = null;
+ IOException catchEx = null;
+ do {
+ try {
+ // If refresh returns new pipeline, retry with it.
+ // If we get IOException due to connectivity issue,
+ // retry according to retry policy.
+ chunks = getChunkInfos();
+ break;
+ } catch(SCMSecurityException ex) {
+ throw ex;
+ } catch (StorageContainerException ex) {
+ refreshPipeline(ex);
+ catchEx = ex;
+ } catch (IOException ex) {
+ LOG.debug("Retry to get chunk info fail", ex);
+ catchEx = ex;
+ }
+ } while (shouldRetryRead(catchEx));
+
+ if (chunks == null) {
+ throw catchEx;
+ } else {
+ // Reset retry count if we get chunks successfully.
+ retries = 0;
}
- if (chunks != null && !chunks.isEmpty()) {
+ if (!chunks.isEmpty()) {
// For each chunk in the block, create a ChunkInputStream and compute
// its chunkOffset
this.chunkOffsets = new long[chunks.size()];
@@ -320,6 +340,17 @@ public class BlockInputStream extends InputStream
} else {
throw e;
}
+ } catch(SCMSecurityException ex) {
+ throw ex;
+ } catch(IOException ex) {
+ // We got a IOException which might be due
+ // to DN down or connectivity issue.
+ if (shouldRetryRead(ex)) {
+ current.releaseClient();
+ continue;
+ } else {
+ throw ex;
+ }
}
if (numBytesRead != numBytesToRead) {
@@ -525,4 +556,9 @@ public class BlockInputStream extends InputStream
public synchronized List<ChunkInputStream> getChunkStreams() {
return chunkStreams;
}
+
+ @VisibleForTesting
+ public static Logger getLog() {
+ return LOG;
+ }
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index f7cfa47..f22e706 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -287,7 +287,7 @@ public class ChunkInputStream extends InputStream
protected synchronized void releaseClient() {
if (xceiverClientFactory != null && xceiverClient != null) {
- xceiverClientFactory.releaseClient(xceiverClient, false);
+ xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
xceiverClient = null;
}
}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
index 556d938..a522004 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
@@ -28,12 +28,14 @@ import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+
/**
* A dummy BlockInputStream with pipeline refresh function to mock read
* block call to DN.
@@ -42,6 +44,7 @@ final class DummyBlockInputStreamWithRetry
extends DummyBlockInputStream {
private int getChunkInfoCount = 0;
+ private IOException ioException;
@SuppressWarnings("parameternumber")
DummyBlockInputStreamWithRetry(
@@ -53,7 +56,7 @@ final class DummyBlockInputStreamWithRetry
XceiverClientFactory xceiverClientManager,
List<ChunkInfo> chunkList,
Map<String, byte[]> chunkMap,
- AtomicBoolean isRerfreshed) {
+ AtomicBoolean isRerfreshed, IOException ioException) {
super(blockId, blockLen, pipeline, token, verifyChecksum,
xceiverClientManager, blockID -> {
isRerfreshed.set(true);
@@ -65,13 +68,18 @@ final class DummyBlockInputStreamWithRetry
.setNodes(Collections.emptyList())
.build();
}, chunkList, chunkMap);
+ this.ioException = ioException;
}
@Override
protected List<ChunkInfo> getChunkInfos() throws IOException {
if (getChunkInfoCount == 0) {
getChunkInfoCount++;
- throw new ContainerNotFoundException("Exception encountered");
+ if (ioException != null) {
+ throw ioException;
+ }
+ throw new StorageContainerException("Exception encountered",
+ CONTAINER_NOT_FOUND);
} else {
return super.getChunkInfos();
}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index b83aa46..7f0121e 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Before;
@@ -39,6 +40,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
import java.io.EOFException;
import java.io.IOException;
@@ -264,7 +267,7 @@ public class TestBlockInputStream {
BlockInputStream blockInputStreamWithRetry =
new DummyBlockInputStreamWithRetry(blockID, blockSize,
MockPipeline.createSingleNodePipeline(), null,
- false, null, chunks, chunkDataMap, isRefreshed);
+ false, null, chunks, chunkDataMap, isRefreshed, null);
try {
Assert.assertFalse(isRefreshed.get());
@@ -278,6 +281,33 @@ public class TestBlockInputStream {
}
@Test
+ public void testGetBlockInfoFailWithIOException() throws Exception {
+ GenericTestUtils.setLogLevel(BlockInputStream.getLog(), Level.DEBUG);
+ GenericTestUtils.LogCapturer logCapturer =
+ GenericTestUtils.LogCapturer.captureLogs(
+ LoggerFactory.getLogger(BlockInputStream.class));
+ BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
+ AtomicBoolean isRefreshed = new AtomicBoolean();
+ createChunkList(5);
+ BlockInputStream blockInputStreamWithRetry =
+ new DummyBlockInputStreamWithRetry(blockID, blockSize,
+ MockPipeline.createSingleNodePipeline(), null,
+ false, null, chunks, chunkDataMap, isRefreshed,
+ new IOException("unavailable"));
+ try {
+ Assert.assertFalse(isRefreshed.get());
+ byte[] b = new byte[200];
+ blockInputStreamWithRetry.read(b, 0, 200);
+ // As in case of IOException we do not do do refresh.
+ Assert.assertFalse(isRefreshed.get());
+ Assert.assertTrue(logCapturer.getOutput().contains(
+ "Retry to get chunk info fail"));
+ } finally {
+ blockInputStreamWithRetry.close();
+ }
+ }
+
+ @Test
public void testRefreshOnReadFailure() throws Exception {
// GIVEN
BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 167ae82..fcf3f13 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -104,10 +103,6 @@ public final class ContainerProtocolCalls {
ContainerCommandRequestProto request = builder.build();
ContainerCommandResponseProto response =
xceiverClient.sendCommand(request, getValidatorList());
- if (ContainerProtos.Result.CONTAINER_NOT_FOUND.equals(
- response.getResult())) {
- throw new ContainerNotFoundException(response.getMessage());
- }
return response.getGetBlock();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org