You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2022/05/06 17:30:50 UTC

[hadoop] branch trunk updated: HDFS-16520. Improve EC pread: avoid potential reading whole block (#4104)

This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29401c82037 HDFS-16520. Improve EC pread: avoid potential reading whole block (#4104)
29401c82037 is described below

commit 29401c820377d02a992eecde51083cf87f8e57af
Author: daimin <da...@outlook.com>
AuthorDate: Sat May 7 01:30:32 2022 +0800

    HDFS-16520. Improve EC pread: avoid potential reading whole block (#4104)
    
    Reviewed-by: Hui Fei <fe...@apache.org>
    Reviewed-by: Takanobu Asanuma <ta...@apache.org>
    Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
---
 .../apache/hadoop/hdfs/DFSClientFaultInjector.java |  4 ++
 .../apache/hadoop/hdfs/DFSStripedInputStream.java  | 15 ++++-
 .../java/org/apache/hadoop/hdfs/StripeReader.java  |  8 ++-
 .../hadoop/hdfs/TestDFSStripedInputStream.java     | 68 ++++++++++++++++++++++
 4 files changed, 92 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index ecea795ee5f..caf8aad32e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.classification.VisibleForTesting;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 
 /**
  * Used for injecting faults in DFSClient and DFSOutputStream tests.
@@ -65,4 +66,7 @@ public class DFSClientFaultInjector {
   public void sleepBeforeHedgedGet() {}
 
   public void delayWhenRenewLeaseTimeout() {}
+
+  public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {}
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index 6377bc461de..5ae51709593 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -232,7 +232,7 @@ public class DFSStripedInputStream extends DFSInputStream {
 
   boolean createBlockReader(LocatedBlock block, long offsetInBlock,
       LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
-      int chunkIndex) throws IOException {
+      int chunkIndex, long readTo) throws IOException {
     BlockReader reader = null;
     final ReaderRetryPolicy retry = new ReaderRetryPolicy();
     DFSInputStream.DNAddrPair dnInfo =
@@ -250,9 +250,14 @@ public class DFSStripedInputStream extends DFSInputStream {
         if (dnInfo == null) {
           break;
         }
+        if (readTo < 0 || readTo > block.getBlockSize()) {
+          readTo = block.getBlockSize();
+        }
         reader = getBlockReader(block, offsetInBlock,
-            block.getBlockSize() - offsetInBlock,
+            readTo - offsetInBlock,
             dnInfo.addr, dnInfo.storageType, dnInfo.info);
+        DFSClientFaultInjector.get().onCreateBlockReader(block, chunkIndex, offsetInBlock,
+            readTo - offsetInBlock);
       } catch (IOException e) {
         if (e instanceof InvalidEncryptionKeyException &&
             retry.shouldRefetchEncryptionKey()) {
@@ -485,11 +490,16 @@ public class DFSStripedInputStream extends DFSInputStream {
     final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(
         blockGroup, cellSize, dataBlkNum, parityBlkNum);
     final BlockReaderInfo[] preaderInfos = new BlockReaderInfo[groupSize];
+    long readTo = -1;
+    for (AlignedStripe stripe : stripes) {
+      readTo = Math.max(readTo, stripe.getOffsetInBlock() + stripe.getSpanInBlock());
+    }
     try {
       for (AlignedStripe stripe : stripes) {
         // Parse group to get chosen DN location
         StripeReader preader = new PositionStripeReader(stripe, ecPolicy, blks,
             preaderInfos, corruptedBlocks, decoder, this);
+        preader.setReadTo(readTo);
         try {
           preader.readStripe();
         } finally {
@@ -554,4 +564,5 @@ public class DFSStripedInputStream extends DFSInputStream {
       parityBuf = null;
     }
   }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
index 932ddb491cc..3fc87c7952a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -119,6 +119,7 @@ abstract class StripeReader {
   protected final int cellSize;
   protected final RawErasureDecoder decoder;
   protected final DFSStripedInputStream dfsStripedInputStream;
+  private long readTo = -1;
 
   protected ECChunk[] decodeInputs;
 
@@ -302,7 +303,7 @@ abstract class StripeReader {
     if (readerInfos[chunkIndex] == null) {
       if (!dfsStripedInputStream.createBlockReader(block,
           alignedStripe.getOffsetInBlock(), targetBlocks,
-          readerInfos, chunkIndex)) {
+          readerInfos, chunkIndex, readTo)) {
         chunk.state = StripingChunk.MISSING;
         return false;
       }
@@ -478,4 +479,9 @@ abstract class StripeReader {
   boolean useDirectBuffer() {
     return decoder.preferDirectBuffer();
   }
+
+  public void setReadTo(long readTo) {
+    this.readTo = readTo;
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index aedea3c8acd..12cfd49a0bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -49,7 +49,9 @@ import org.junit.rules.Timeout;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
@@ -664,4 +666,70 @@ public class TestDFSStripedInputStream {
       assertNull(in.parityBuf);
       in.close();
   }
+
+  @Test
+  public void testBlockReader() throws Exception {
+    ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); // RS-6-3-1024k
+    int fileSize = 19 * cellSize + 100;
+    long stripeSize = (long) dataBlocks * cellSize;
+    byte[] bytes = StripedFileTestUtil.generateBytes(fileSize);
+    DFSTestUtil.writeFile(fs, filePath, new String(bytes));
+
+    try (DFSStripedInputStream in =
+             (DFSStripedInputStream) fs.getClient().open(filePath.toString())) {
+      // Verify pread:
+      verifyPreadRanges(in, 0, 2 * cellSize,
+          2 * cellSize, Arrays.asList("0_0_1048576", "1_0_1048576"));
+      verifyPreadRanges(in, 0, 5 * cellSize + 9527,
+          5 * cellSize + 9527, Arrays.asList("0_0_1048576", "1_0_1048576",
+              "2_0_1048576", "3_0_1048576", "4_0_1048576", "5_0_1048576"));
+      verifyPreadRanges(in, 100, 5 * cellSize + 9527,
+          5 * cellSize + 9527, Arrays.asList("0_100_1048476", "1_0_1048576",
+              "2_0_1048576", "3_0_1048576", "4_0_1048576", "5_0_1048576"));
+      verifyPreadRanges(in, stripeSize * 3, 2 * cellSize,
+          cellSize + 100, Arrays.asList("0_1048576_1048576", "1_1048576_100"));
+
+      // Verify sread:
+      verifySreadRanges(in, 0, Arrays.asList("0_0_2097152", "1_0_2097152",
+          "2_0_2097152", "3_0_2097152", "4_0_2097152", "5_0_2097152"));
+      verifySreadRanges(in, stripeSize * 2, Arrays.asList("0_0_2097152", "1_0_1048676",
+          "2_0_1048576", "3_0_1048576", "4_0_1048576", "5_0_1048576"));
+    }
+  }
+
+  private void verifyPreadRanges(DFSStripedInputStream in, long position,
+                                 int length, int lengthExpected,
+                                 List<String> rangesExpected) throws Exception {
+    List<String> ranges = new ArrayList<>(); // range format: chunkIndex_offset_len
+    DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+      @Override
+      public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
+                                      long offset, long length) {
+        ranges.add(String.format("%s_%s_%s", chunkIndex, offset, length));
+      }
+    });
+    assertEquals(lengthExpected, in.read(position, new byte[length], 0, length));
+    Collections.sort(ranges);
+    Collections.sort(rangesExpected);
+    assertEquals(rangesExpected, ranges);
+  }
+
+  private void verifySreadRanges(DFSStripedInputStream in, long position,
+                                 List<String> rangesExpected) throws Exception {
+    List<String> ranges = new ArrayList<>(); // range format: chunkIndex_offset_len
+    DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+      @Override
+      public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
+                                      long offset, long length) {
+        ranges.add(String.format("%s_%s_%s", chunkIndex, offset, length));
+      }
+    });
+    in.seek(position);
+    int length = in.read(new byte[1024]);
+    assertEquals(1024, length);
+    Collections.sort(ranges);
+    Collections.sort(rangesExpected);
+    assertEquals(rangesExpected, ranges);
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org