You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2021/11/03 16:23:38 UTC

[ozone] branch master updated: HDDS-5864. Retry when DN connection issue during getBlock/ReadChunk call during Ozone key Read (#2746)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 73a081a  HDDS-5864. Retry when DN connection issue during getBlock/ReadChunk call during Ozone key Read (#2746)
73a081a is described below

commit 73a081aaf38e43a37e3279d3ac369b4896b17a4e
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Wed Nov 3 09:23:10 2021 -0700

    HDDS-5864. Retry when DN connection issue during getBlock/ReadChunk call during Ozone key Read (#2746)
---
 .../hadoop/hdds/scm/storage/BlockInputStream.java  | 52 ++++++++++++++++++----
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |  2 +-
 .../storage/DummyBlockInputStreamWithRetry.java    | 14 ++++--
 .../hdds/scm/storage/TestBlockInputStream.java     | 32 ++++++++++++-
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  5 ---
 5 files changed, 87 insertions(+), 18 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index def810f..7475db2 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@ -42,9 +42,9 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.security.token.Token;
@@ -75,6 +75,7 @@ public class BlockInputStream extends InputStream
   private XceiverClientFactory xceiverClientFactory;
   private XceiverClientSpi xceiverClient;
   private boolean initialized = false;
+  // TODO: do we need to change retrypolicy based on exception.
   private final RetryPolicy retryPolicy =
       HddsClientUtils.createRetryPolicy(3, TimeUnit.SECONDS.toMillis(1));
   private int retries;
@@ -142,15 +143,34 @@ public class BlockInputStream extends InputStream
       return;
     }
 
-    List<ChunkInfo> chunks;
-    try {
-      chunks = getChunkInfos();
-    } catch (ContainerNotFoundException ioEx) {
-      refreshPipeline(ioEx);
-      chunks = getChunkInfos();
+    List<ChunkInfo> chunks = null;
+    IOException catchEx = null;
+    do {
+      try {
+        // If refresh returns new pipeline, retry with it.
+        // If we get IOException due to connectivity issue,
+        // retry according to retry policy.
+        chunks = getChunkInfos();
+        break;
+      } catch(SCMSecurityException ex) {
+        throw ex;
+      } catch (StorageContainerException ex) {
+        refreshPipeline(ex);
+        catchEx = ex;
+      } catch (IOException ex) {
+        LOG.debug("Retry to get chunk info fail", ex);
+        catchEx = ex;
+      }
+    } while (shouldRetryRead(catchEx));
+
+    if (chunks == null) {
+      throw catchEx;
+    } else {
+      // Reset retry count if we get chunks successfully.
+      retries = 0;
     }
 
-    if (chunks != null && !chunks.isEmpty()) {
+    if (!chunks.isEmpty()) {
       // For each chunk in the block, create a ChunkInputStream and compute
       // its chunkOffset
       this.chunkOffsets = new long[chunks.size()];
@@ -320,6 +340,17 @@ public class BlockInputStream extends InputStream
         } else {
           throw e;
         }
+      } catch(SCMSecurityException ex) {
+        throw ex;
+      } catch(IOException ex) {
+        // We got a IOException which might be due
+        // to DN down or connectivity issue.
+        if (shouldRetryRead(ex)) {
+          current.releaseClient();
+          continue;
+        } else {
+          throw ex;
+        }
       }
 
       if (numBytesRead != numBytesToRead) {
@@ -525,4 +556,9 @@ public class BlockInputStream extends InputStream
   public synchronized List<ChunkInputStream> getChunkStreams() {
     return chunkStreams;
   }
+
+  @VisibleForTesting
+  public static Logger getLog() {
+    return LOG;
+  }
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
index f7cfa47..f22e706 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java
@@ -287,7 +287,7 @@ public class ChunkInputStream extends InputStream
 
   protected synchronized void releaseClient() {
     if (xceiverClientFactory != null && xceiverClient != null) {
-      xceiverClientFactory.releaseClient(xceiverClient, false);
+      xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
       xceiverClient = null;
     }
   }
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
index 556d938..a522004 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/DummyBlockInputStreamWithRetry.java
@@ -28,12 +28,14 @@ import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.XceiverClientFactory;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND;
+
 /**
  * A dummy BlockInputStream with pipeline refresh function to mock read
  * block call to DN.
@@ -42,6 +44,7 @@ final class DummyBlockInputStreamWithRetry
     extends DummyBlockInputStream {
 
   private int getChunkInfoCount = 0;
+  private IOException ioException;
 
   @SuppressWarnings("parameternumber")
   DummyBlockInputStreamWithRetry(
@@ -53,7 +56,7 @@ final class DummyBlockInputStreamWithRetry
       XceiverClientFactory xceiverClientManager,
       List<ChunkInfo> chunkList,
       Map<String, byte[]> chunkMap,
-      AtomicBoolean isRerfreshed) {
+      AtomicBoolean isRerfreshed, IOException ioException) {
     super(blockId, blockLen, pipeline, token, verifyChecksum,
         xceiverClientManager, blockID -> {
           isRerfreshed.set(true);
@@ -65,13 +68,18 @@ final class DummyBlockInputStreamWithRetry
               .setNodes(Collections.emptyList())
               .build();
         }, chunkList, chunkMap);
+    this.ioException  = ioException;
   }
 
   @Override
   protected List<ChunkInfo> getChunkInfos() throws IOException {
     if (getChunkInfoCount == 0) {
       getChunkInfoCount++;
-      throw new ContainerNotFoundException("Exception encountered");
+      if (ioException != null) {
+        throw  ioException;
+      }
+      throw new StorageContainerException("Exception encountered",
+          CONTAINER_NOT_FOUND);
     } else {
       return super.getChunkInfos();
     }
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
index b83aa46..7f0121e 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockInputStream.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.common.Checksum;
 
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
+import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.junit.Assert;
 import org.junit.Before;
@@ -39,6 +40,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.junit.MockitoJUnitRunner;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -264,7 +267,7 @@ public class TestBlockInputStream {
     BlockInputStream blockInputStreamWithRetry =
         new DummyBlockInputStreamWithRetry(blockID, blockSize,
             MockPipeline.createSingleNodePipeline(), null,
-            false, null, chunks, chunkDataMap, isRefreshed);
+            false, null, chunks, chunkDataMap, isRefreshed, null);
 
     try {
       Assert.assertFalse(isRefreshed.get());
@@ -278,6 +281,33 @@ public class TestBlockInputStream {
   }
 
   @Test
+  public void testGetBlockInfoFailWithIOException() throws Exception {
+    GenericTestUtils.setLogLevel(BlockInputStream.getLog(), Level.DEBUG);
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(
+            LoggerFactory.getLogger(BlockInputStream.class));
+    BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
+    AtomicBoolean isRefreshed = new AtomicBoolean();
+    createChunkList(5);
+    BlockInputStream blockInputStreamWithRetry =
+        new DummyBlockInputStreamWithRetry(blockID, blockSize,
+            MockPipeline.createSingleNodePipeline(), null,
+            false, null, chunks, chunkDataMap, isRefreshed,
+            new IOException("unavailable"));
+    try {
+      Assert.assertFalse(isRefreshed.get());
+      byte[] b = new byte[200];
+      blockInputStreamWithRetry.read(b, 0, 200);
+      // As in case of IOException we do not do do refresh.
+      Assert.assertFalse(isRefreshed.get());
+      Assert.assertTrue(logCapturer.getOutput().contains(
+          "Retry to get chunk info fail"));
+    } finally {
+      blockInputStreamWithRetry.close();
+    }
+  }
+
+  @Test
   public void testRefreshOnReadFailure() throws Exception {
     // GIVEN
     BlockID blockID = new BlockID(new ContainerBlockID(1, 1));
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index 167ae82..fcf3f13 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
-import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.BlockNotCommittedException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -104,10 +103,6 @@ public final class ContainerProtocolCalls  {
     ContainerCommandRequestProto request = builder.build();
     ContainerCommandResponseProto response =
         xceiverClient.sendCommand(request, getValidatorList());
-    if (ContainerProtos.Result.CONTAINER_NOT_FOUND.equals(
-        response.getResult())) {
-      throw new ContainerNotFoundException(response.getMessage());
-    }
     return response.getGetBlock();
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org