You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/11/08 13:41:54 UTC

[hadoop] branch branch-3.3 updated (d33ee67151b -> b1ea32f91cb)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


    from d33ee67151b Hadoop-18520. Backport HADOOP-18427 and HADOOP-18452 to branch-3.3 (#5118)
     new c3920757616 HADOOP-18507. VectorIO FileRange type to support a "reference" field (#5076)
     new b1ea32f91cb HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead (#5103)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/hadoop/fs/FileRange.java  |  22 +++-
 .../apache/hadoop/fs/impl/CombinedFileRange.java   |   4 +-
 .../org/apache/hadoop/fs/impl/FileRangeImpl.java   |  19 ++-
 .../apache/hadoop/fs/TestVectoredReadUtils.java    | 134 +++++++++++++++------
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  14 +++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   1 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   7 ++
 .../constants/FileSystemConfigurations.java        |   1 +
 .../fs/azurebfs/services/AbfsInputStream.java      |   7 +-
 .../azurebfs/services/AbfsInputStreamContext.java  |  12 ++
 .../fs/azurebfs/ITestAbfsReadWriteAndSeek.java     |  32 +++--
 .../hadoop/fs/azurebfs/TestTracingContext.java     |   4 +-
 .../hadoop/benchmark/VectoredReadBenchmark.java    |   2 +-
 13 files changed, 207 insertions(+), 52 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/02: HADOOP-18507. VectorIO FileRange type to support a "reference" field (#5076)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit c3920757616236e5edc6108e92862f8a0a6e1c2b
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Mon Oct 31 21:12:13 2022 +0000

    HADOOP-18507. VectorIO FileRange type to support a "reference" field (#5076)
    
    Contributed by Steve Loughran
---
 .../main/java/org/apache/hadoop/fs/FileRange.java  |  22 +++-
 .../apache/hadoop/fs/impl/CombinedFileRange.java   |   4 +-
 .../org/apache/hadoop/fs/impl/FileRangeImpl.java   |  19 ++-
 .../apache/hadoop/fs/TestVectoredReadUtils.java    | 134 +++++++++++++++------
 .../hadoop/benchmark/VectoredReadBenchmark.java    |   2 +-
 5 files changed, 141 insertions(+), 40 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
index e55696e9650..97da65585d6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
@@ -55,6 +55,15 @@ public interface FileRange {
    */
   void setData(CompletableFuture<ByteBuffer> data);
 
+  /**
+   * Get any reference passed in to the file range constructor.
+   * This is not used by any implementation code; it is to help
+   * bind this API to libraries retrieving multiple stripes of
+   * data in parallel.
+   * @return a reference or null.
+   */
+  Object getReference();
+
   /**
    * Factory method to create a FileRange object.
    * @param offset starting offset of the range.
@@ -62,6 +71,17 @@ public interface FileRange {
    * @return a new instance of FileRangeImpl.
    */
   static FileRange createFileRange(long offset, int length) {
-    return new FileRangeImpl(offset, length);
+    return new FileRangeImpl(offset, length, null);
+  }
+
+  /**
+   * Factory method to create a FileRange object.
+   * @param offset starting offset of the range.
+   * @param length length of the range.
+   * @param reference nullable reference to store in the range.
+   * @return a new instance of FileRangeImpl.
+   */
+  static FileRange createFileRange(long offset, int length, Object reference) {
+    return new FileRangeImpl(offset, length, reference);
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
index 516bbb2c70c..c9555a1e541 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
@@ -29,10 +29,10 @@ import java.util.List;
  * together into a single read for efficiency.
  */
 public class CombinedFileRange extends FileRangeImpl {
-  private ArrayList<FileRange> underlying = new ArrayList<>();
+  private List<FileRange> underlying = new ArrayList<>();
 
   public CombinedFileRange(long offset, long end, FileRange original) {
-    super(offset, (int) (end - offset));
+    super(offset, (int) (end - offset), null);
     this.underlying.add(original);
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java
index 041e5f0a8d2..1239be764ba 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java
@@ -34,9 +34,21 @@ public class FileRangeImpl implements FileRange {
   private int length;
   private CompletableFuture<ByteBuffer> reader;
 
-  public FileRangeImpl(long offset, int length) {
+  /**
+   * nullable reference to store in the range.
+   */
+  private final Object reference;
+
+  /**
+   * Create.
+   * @param offset offset in file
+   * @param length length of data to read.
+   * @param reference nullable reference to store in the range.
+   */
+  public FileRangeImpl(long offset, int length, Object reference) {
     this.offset = offset;
     this.length = length;
+    this.reference = reference;
   }
 
   @Override
@@ -71,4 +83,9 @@ public class FileRangeImpl implements FileRange {
   public CompletableFuture<ByteBuffer> getData() {
     return reader;
   }
+
+  @Override
+  public Object getReference() {
+    return reference;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java
index ebf0e14053b..fdfa8f6eb6f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java
@@ -96,7 +96,10 @@ public class TestVectoredReadUtils extends HadoopTestBase {
 
   @Test
   public void testMerge() {
-    FileRange base = FileRange.createFileRange(2000, 1000);
+    // a reference to use for tracking
+    Object tracker1 = "one";
+    Object tracker2 = "two";
+    FileRange base = FileRange.createFileRange(2000, 1000, tracker1);
     CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
 
     // test when the gap between is too big
@@ -104,44 +107,48 @@ public class TestVectoredReadUtils extends HadoopTestBase {
         FileRange.createFileRange(5000, 1000), 2000, 4000));
     assertEquals("Number of ranges in merged range shouldn't increase",
             1, mergeBase.getUnderlying().size());
-    assertEquals("post merge offset", 2000, mergeBase.getOffset());
-    assertEquals("post merge length", 1000, mergeBase.getLength());
+    assertFileRange(mergeBase, 2000, 1000);
 
     // test when the total size gets exceeded
     assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
         FileRange.createFileRange(5000, 1000), 2001, 3999));
     assertEquals("Number of ranges in merged range shouldn't increase",
             1, mergeBase.getUnderlying().size());
-    assertEquals("post merge offset", 2000, mergeBase.getOffset());
-    assertEquals("post merge length", 1000, mergeBase.getLength());
+    assertFileRange(mergeBase, 2000, 1000);
 
     // test when the merge works
     assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
-        FileRange.createFileRange(5000, 1000), 2001, 4000));
+        FileRange.createFileRange(5000, 1000, tracker2),
+        2001, 4000));
     assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
-    assertEquals("post merge offset", 2000, mergeBase.getOffset());
-    assertEquals("post merge length", 4000, mergeBase.getLength());
+    assertFileRange(mergeBase, 2000, 4000);
+
+    Assertions.assertThat(mergeBase.getUnderlying().get(0).getReference())
+        .describedAs("reference of range %s", mergeBase.getUnderlying().get(0))
+        .isSameAs(tracker1);
+    Assertions.assertThat(mergeBase.getUnderlying().get(1).getReference())
+        .describedAs("reference of range %s", mergeBase.getUnderlying().get(1))
+        .isSameAs(tracker2);
 
     // reset the mergeBase and test with a 10:1 reduction
     mergeBase = new CombinedFileRange(200, 300, base);
-    assertEquals(200, mergeBase.getOffset());
-    assertEquals(100, mergeBase.getLength());
+    assertFileRange(mergeBase, 200, 100);
+
     assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
         FileRange.createFileRange(5000, 1000), 201, 400));
     assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
-    assertEquals("post merge offset", 200, mergeBase.getOffset());
-    assertEquals("post merge length", 400, mergeBase.getLength());
+    assertFileRange(mergeBase, 200, 400);
   }
 
   @Test
   public void testSortAndMerge() {
     List<FileRange> input = Arrays.asList(
-        FileRange.createFileRange(3000, 100),
-        FileRange.createFileRange(2100, 100),
-        FileRange.createFileRange(1000, 100)
+        FileRange.createFileRange(3000, 100, "1"),
+        FileRange.createFileRange(2100, 100, null),
+        FileRange.createFileRange(1000, 100, "3")
         );
     assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
-    List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
+    final List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
             Arrays.asList(sortRanges(input)), 100, 1001, 2500);
     Assertions.assertThat(outputList)
             .describedAs("merged range size")
@@ -150,51 +157,105 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     Assertions.assertThat(output.getUnderlying())
             .describedAs("merged range underlying size")
             .hasSize(3);
-    assertEquals("range[1000,3100)", output.toString());
+    // range[1000,3100)
+    assertFileRange(output, 1000, 2100);
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
 
     // the minSeek doesn't allow the first two to merge
     assertFalse("Ranges are non disjoint",
             VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
-    outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
+    final List<CombinedFileRange> list2 = VectoredReadUtils.mergeSortedRanges(
+        Arrays.asList(sortRanges(input)),
             100, 1000, 2100);
-    Assertions.assertThat(outputList)
+    Assertions.assertThat(list2)
             .describedAs("merged range size")
             .hasSize(2);
-    assertEquals("range[1000,1100)", outputList.get(0).toString());
-    assertEquals("range[2100,3100)", outputList.get(1).toString());
+    assertFileRange(list2.get(0), 1000, 100);
+
+    // range[2100,3100)
+    assertFileRange(list2.get(1), 2100, 1000);
+
     assertTrue("merged output ranges are disjoint",
-            VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000));
+            VectoredReadUtils.isOrderedDisjoint(list2, 100, 1000));
 
     // the maxSize doesn't allow the third range to merge
     assertFalse("Ranges are non disjoint",
             VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
-    outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
+    final List<CombinedFileRange> list3 = VectoredReadUtils.mergeSortedRanges(
+        Arrays.asList(sortRanges(input)),
             100, 1001, 2099);
-    Assertions.assertThat(outputList)
+    Assertions.assertThat(list3)
             .describedAs("merged range size")
             .hasSize(2);
-    assertEquals("range[1000,2200)", outputList.get(0).toString());
-    assertEquals("range[3000,3100)", outputList.get(1).toString());
+    // range[1000,2200)
+    CombinedFileRange range0 = list3.get(0);
+    assertFileRange(range0, 1000, 1200);
+    assertFileRange(range0.getUnderlying().get(0),
+        1000, 100, "3");
+    assertFileRange(range0.getUnderlying().get(1),
+        2100, 100, null);
+    CombinedFileRange range1 = list3.get(1);
+    // range[3000,3100)
+    assertFileRange(range1, 3000, 100);
+    assertFileRange(range1.getUnderlying().get(0),
+        3000, 100, "1");
+
     assertTrue("merged output ranges are disjoint",
-            VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
+            VectoredReadUtils.isOrderedDisjoint(list3, 100, 800));
 
     // test the round up and round down (the maxSize doesn't allow any merges)
     assertFalse("Ranges are non disjoint",
             VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
-    outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
+    final List<CombinedFileRange> list4 = VectoredReadUtils.mergeSortedRanges(
+        Arrays.asList(sortRanges(input)),
             16, 1001, 100);
-    Assertions.assertThat(outputList)
+    Assertions.assertThat(list4)
             .describedAs("merged range size")
             .hasSize(3);
-    assertEquals("range[992,1104)", outputList.get(0).toString());
-    assertEquals("range[2096,2208)", outputList.get(1).toString());
-    assertEquals("range[2992,3104)", outputList.get(2).toString());
+    // range[992,1104)
+    assertFileRange(list4.get(0), 992, 112);
+    // range[2096,2208)
+    assertFileRange(list4.get(1), 2096, 112);
+    // range[2992,3104)
+    assertFileRange(list4.get(2), 2992, 112);
     assertTrue("merged output ranges are disjoint",
-            VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700));
+            VectoredReadUtils.isOrderedDisjoint(list4, 16, 700));
+  }
+
+  /**
+   * Assert that a file range satisfies the conditions.
+   * @param range range to validate
+   * @param offset offset of range
+   * @param length range length
+   */
+  private void assertFileRange(FileRange range, long offset, int length) {
+    Assertions.assertThat(range)
+        .describedAs("file range %s", range)
+        .isNotNull();
+    Assertions.assertThat(range.getOffset())
+        .describedAs("offset of %s", range)
+        .isEqualTo(offset);
+    Assertions.assertThat(range.getLength())
+        .describedAs("length of %s", range)
+        .isEqualTo(length);
+  }
+
+  /**
+   * Assert that a file range satisfies the conditions.
+   * @param range range to validate
+   * @param offset offset of range
+   * @param length range length
+   * @param reference reference; may be null.
+   */
+  private void assertFileRange(FileRange range, long offset, int length, Object reference) {
+    assertFileRange(range, offset, length);
+    Assertions.assertThat(range.getReference())
+        .describedAs("reference field of file range %s", range)
+        .isEqualTo(reference);
   }
 
+
   @Test
   public void testSortAndMergeMoreCases() throws Exception {
     List<FileRange> input = Arrays.asList(
@@ -214,7 +275,9 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     Assertions.assertThat(output.getUnderlying())
             .describedAs("merged range underlying size")
             .hasSize(4);
-    assertEquals("range[1000,3110)", output.toString());
+
+    assertFileRange(output, 1000, 2110);
+
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
 
@@ -227,7 +290,8 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     Assertions.assertThat(output.getUnderlying())
             .describedAs("merged range underlying size")
             .hasSize(4);
-    assertEquals("range[1000,3200)", output.toString());
+    assertFileRange(output, 1000, 2200);
+
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
 
diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
index 631842f78e2..5df46c36786 100644
--- a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
+++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
@@ -169,7 +169,7 @@ public class VectoredReadBenchmark {
 
     FileRangeCallback(AsynchronousFileChannel channel, long offset,
                       int length, Joiner joiner, ByteBuffer buffer) {
-      super(offset, length);
+      super(offset, length, null);
       this.channel = channel;
       this.joiner = joiner;
       this.buffer = buffer;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/02: HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead (#5103)

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b1ea32f91cb594fe7056c8ec94341fce8c2b64f3
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Tue Nov 8 11:43:04 2022 +0000

    HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead (#5103)
    
    * HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead
    
    Adds new config option to turn off readahead
    * also allows it to be passed in through openFile(),
    * extends ITestAbfsReadWriteAndSeek to use the option, including one
      replicated test...that shows that turning it off is slower.
    
    Important: this does not address the critical data corruption issue
    HADOOP-18521. ABFS ReadBufferManager buffer sharing across concurrent HTTP requests
    
    What is does do is provide a way to completely bypass the ReadBufferManager.
    To mitigate the problem, either fs.azure.enable.readahead needs to be set to false,
    or set "fs.azure.readaheadqueue.depth" to 0 -this still goes near the (broken)
    ReadBufferManager code, but does't trigger the bug.
    
    For safe reading of files through the ABFS connector, readahead MUST be disabled
    or the followup fix to HADOOP-18521 applied
    
    Contributed by Steve Loughran
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      | 14 ++++++++++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  1 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  7 +++++
 .../constants/FileSystemConfigurations.java        |  1 +
 .../fs/azurebfs/services/AbfsInputStream.java      |  7 ++++-
 .../azurebfs/services/AbfsInputStreamContext.java  | 12 ++++++++
 .../fs/azurebfs/ITestAbfsReadWriteAndSeek.java     | 32 ++++++++++++++++------
 .../hadoop/fs/azurebfs/TestTracingContext.java     |  4 +--
 8 files changed, 66 insertions(+), 12 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 6462cc96009..4c77d2e136d 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -302,6 +302,11 @@ public class AbfsConfiguration{
           DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
   private boolean trackLatency;
 
+  @BooleanConfigurationValidatorAnnotation(
+      ConfigurationKey = FS_AZURE_ENABLE_READAHEAD,
+      DefaultValue = DEFAULT_ENABLE_READAHEAD)
+  private boolean enabledReadAhead;
+
   @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
       MinValue = 0,
       DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -915,6 +920,15 @@ public class AbfsConfiguration{
     }
   }
 
+  public boolean isReadAheadEnabled() {
+    return this.enabledReadAhead;
+  }
+
+  @VisibleForTesting
+  void setReadAheadEnabled(final boolean enabledReadAhead) {
+    this.enabledReadAhead = enabledReadAhead;
+  }
+
   public int getReadAheadRange() {
     return this.readAheadRange;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 28c3ef25e94..3941f6e421a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -808,6 +808,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
             .withReadBufferSize(abfsConfiguration.getReadBufferSize())
             .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
             .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
+            .isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
             .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
             .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
             .withReadAheadRange(abfsConfiguration.getReadAheadRange())
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 9d3b2d5e82c..0353f3e01ff 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -186,6 +186,13 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
   public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
   public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
+
+  /**
+   * Enable or disable readahead buffer in AbfsInputStream.
+   * Value: {@value}.
+   */
+  public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";
+
   /** Setting this true will make the driver use it's own RemoteIterator implementation */
   public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
   /** Server side encryption key */
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 63d62a33b18..42f3b7503e0 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -106,6 +106,7 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
   public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
 
+  public static final boolean DEFAULT_ENABLE_READAHEAD = true;
   public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
   public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 7033ae9a4a0..e7ddffe99fd 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -137,7 +137,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.eTag = eTag;
     this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
-    this.readAheadEnabled = true;
+    this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
     this.alwaysReadBufferSize
         = abfsInputStreamContext.shouldReadBufferSizeAlways();
     this.bufferedPreadDisabled = abfsInputStreamContext
@@ -745,6 +745,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     return buffer;
   }
 
+  @VisibleForTesting
+  public boolean isReadAheadEnabled() {
+    return readAheadEnabled;
+  }
+
   @VisibleForTesting
   public int getReadAheadRange() {
     return readAheadRange;
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index 55f01bf15bc..05afc7b9858 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -35,6 +35,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
 
   private boolean tolerateOobAppends;
 
+  private boolean isReadAheadEnabled = true;
+
   private boolean alwaysReadBufferSize;
 
   private int readAheadBlockSize;
@@ -72,6 +74,12 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return this;
   }
 
+  public AbfsInputStreamContext isReadAheadEnabled(
+          final boolean isReadAheadEnabled) {
+    this.isReadAheadEnabled = isReadAheadEnabled;
+    return this;
+  }
+
   public AbfsInputStreamContext withReadAheadRange(
           final int readAheadRange) {
     this.readAheadRange = readAheadRange;
@@ -141,6 +149,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return tolerateOobAppends;
   }
 
+  public boolean isReadAheadEnabled() {
+    return isReadAheadEnabled;
+  }
+
   public int getReadAheadRange() {
     return readAheadRange;
   }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
index 5bd6eaff42e..beada775ae8 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
-import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
@@ -40,6 +39,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.A
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
 
 /**
  * Test read, write and seek.
@@ -50,18 +50,27 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.M
 public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
   private static final String TEST_PATH = "/testfile";
 
-  @Parameterized.Parameters(name = "Size={0}")
+  /**
+   * Parameterize on read buffer size and readahead.
+   * For test performance, a full x*y test matrix is not used.
+   * @return the test parameters
+   */
+  @Parameterized.Parameters(name = "Size={0}-readahead={1}")
   public static Iterable<Object[]> sizes() {
-    return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE},
-        {DEFAULT_READ_BUFFER_SIZE},
-        {APPENDBLOB_MAX_WRITE_BUFFER_SIZE},
-        {MAX_BUFFER_SIZE}});
+    return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE, true},
+        {DEFAULT_READ_BUFFER_SIZE, false},
+        {DEFAULT_READ_BUFFER_SIZE, true},
+        {APPENDBLOB_MAX_WRITE_BUFFER_SIZE, false},
+        {MAX_BUFFER_SIZE, true}});
   }
 
   private final int size;
+  private final boolean readaheadEnabled;
 
-  public ITestAbfsReadWriteAndSeek(final int size) throws Exception {
+  public ITestAbfsReadWriteAndSeek(final int size,
+      final boolean readaheadEnabled) throws Exception {
     this.size = size;
+    this.readaheadEnabled = readaheadEnabled;
   }
 
   @Test
@@ -74,6 +83,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
     final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
     abfsConfiguration.setWriteBufferSize(bufferSize);
     abfsConfiguration.setReadBufferSize(bufferSize);
+    abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
 
     final byte[] b = new byte[2 * bufferSize];
     new Random().nextBytes(b);
@@ -85,7 +95,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
     } finally{
     stream.close();
     }
-    IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
+    logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
 
     final byte[] readBuffer = new byte[2 * bufferSize];
     int result;
@@ -109,7 +119,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
       inputStream.seek(0);
       result = inputStream.read(readBuffer, 0, bufferSize);
     }
-    IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
+    logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
 
     assertNotEquals("data read in final read()", -1, result);
     assertArrayEquals(readBuffer, b);
@@ -121,6 +131,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
     final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
     int bufferSize = MIN_BUFFER_SIZE;
     abfsConfiguration.setReadBufferSize(bufferSize);
+    abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
 
     final byte[] b = new byte[bufferSize * 10];
     new Random().nextBytes(b);
@@ -132,8 +143,10 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
               ((AbfsOutputStream) stream.getWrappedStream())
                   .getStreamID()));
       stream.write(b);
+      logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
     }
 
+
     final byte[] readBuffer = new byte[4 * bufferSize];
     int result;
     fs.registerListener(
@@ -146,6 +159,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
               ((AbfsInputStream) inputStream.getWrappedStream())
                   .getStreamID()));
       result = inputStream.read(readBuffer, 0, bufferSize*4);
+      logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, inputStream);
     }
     fs.registerListener(null);
   }
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
index 006004850d0..0e7c70e91a9 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java
@@ -130,10 +130,10 @@ public class TestTracingContext extends AbstractAbfsIntegrationTest {
 
     testClasses.put(new ITestAzureBlobFileSystemListStatus(), //liststatus
         ITestAzureBlobFileSystemListStatus.class.getMethod("testListPath"));
-    testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //open,
+    testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, true), //open,
         // read, write
         ITestAbfsReadWriteAndSeek.class.getMethod("testReadAheadRequestID"));
-    testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //read (bypassreadahead)
+    testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, false), //read (bypassreadahead)
         ITestAbfsReadWriteAndSeek.class
             .getMethod("testReadAndWriteWithDifferentBufferSizesAndSeek"));
     testClasses.put(new ITestAzureBlobFileSystemAppend(), //append


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org