You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mt...@apache.org on 2022/05/02 22:53:09 UTC

[hadoop] 02/02: HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)

This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch feature-vectored-io
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 7d1a2c3e2885f861e4cb2cd4278e2c04baac29bc
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Sat Apr 30 04:17:33 2022 +0530

    HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)
    
    Part of HADOOP-18103.
    Introducing fs.s3a.vectored.read.min.seek.size and fs.s3a.vectored.read.max.merged.size
    to configure min seek and max read during a vectored IO operation in S3A connector.
    These properties actually define how the ranges will be merged. To completely
    disable merging set fs.s3a.max.readsize.vectored.read to 0.
    
    Contributed By: Mukund Thakur
---
 .../site/markdown/filesystem/fsdatainputstream.md  |  1 +
 .../contract/AbstractContractVectoredReadTest.java | 21 +++---
 .../hadoop/fs/impl/TestVectoredReadUtils.java      | 24 +++++++
 .../java/org/apache/hadoop/test/MoreAsserts.java   | 12 ++++
 .../java/org/apache/hadoop/fs/s3a/Constants.java   | 27 ++++++++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 25 ++++++-
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   | 21 ++++++
 .../org/apache/hadoop/fs/s3a/S3AReadOpContext.java | 21 +++++-
 .../apache/hadoop/fs/s3a/VectoredIOContext.java    | 78 ++++++++++++++++++++++
 .../site/markdown/tools/hadoop-aws/performance.md  | 30 +++++++++
 .../contract/s3a/ITestS3AContractVectoredRead.java | 54 ++++++++++++++-
 11 files changed, 298 insertions(+), 16 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
index 0fe1772d266..e4a2830967e 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
@@ -474,6 +474,7 @@ end of first and start of next range is more than this value.
 
 Maximum number of bytes which can be read in one go after merging the ranges.
 Two ranges won't be merged if the combined data to be read is more than this value.
+Essentially setting this to 0 will disable the merging of ranges.
 
 ## Consistency
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index eee4b11e739..756c3de85cc 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -18,15 +18,6 @@
 
 package org.apache.hadoop.fs.contract;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -42,6 +33,15 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.FutureIOSupport;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 
@@ -52,7 +52,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
 
   public static final int DATASET_LEN = 64 * 1024;
   private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
-  private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
+  protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
   private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
   private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
 
@@ -172,6 +172,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     }
   }
 
+  @Test
   public void testSameRanges() throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
index f789f361905..cfd366701be 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
@@ -207,6 +207,30 @@ public class TestVectoredReadUtils extends HadoopTestBase {
             VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
 
   }
+
+  @Test
+  public void testMaxSizeZeroDisablesMering() throws Exception {
+    List<FileRange> randomRanges = Arrays.asList(
+            new FileRangeImpl(3000, 110),
+            new FileRangeImpl(3000, 100),
+            new FileRangeImpl(2100, 100)
+    );
+    assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
+    assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
+    assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0);
+  }
+
+  private void assertEqualRangeCountsAfterMerging(List<FileRange> inputRanges,
+                                                  int chunkSize,
+                                                  int minimumSeek,
+                                                  int maxSize) {
+    List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
+            .sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize);
+    Assertions.assertThat(combinedFileRanges)
+            .describedAs("Mismatch in number of ranges post merging")
+            .hasSize(inputRanges.size());
+  }
+
   interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
     // nothing
   }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
index f83ef9e63d1..f6e6055d78e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
@@ -86,4 +86,16 @@ public class MoreAsserts {
                     "completed exceptionally")
             .isTrue();
   }
+
+  /**
+   * Assert two same type of values.
+   * @param actual actual value.
+   * @param expected expected value.
+   * @param message error message to print in case of mismatch.
+   */
+  public static <T> void assertEqual(T actual, T expected, String message) {
+    Assertions.assertThat(actual)
+            .describedAs("Mismatch in %s", message)
+            .isEqualTo(expected);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index e5369b84883..02b23ba8f36 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1108,4 +1108,31 @@ public final class Constants {
    * Require that all S3 access is made through Access Points.
    */
   public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";
+
+  /**
+   * What is the smallest reasonable seek in bytes such
+   * that we group ranges together during vectored read operation.
+   * Value : {@value}.
+   */
+  public static final String AWS_S3_VECTOR_READS_MIN_SEEK_SIZE =
+          "fs.s3a.vectored.read.min.seek.size";
+
+  /**
+   * What is the largest merged read size in bytes such
+   * that we group ranges together during vectored read.
+   * Setting this value to 0 will disable merging of ranges.
+   * Value : {@value}.
+   */
+  public static final String AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE =
+          "fs.s3a.vectored.read.max.merged.size";
+
+  /**
+   * Default minimum seek in bytes during vectored reads : {@value}.
+   */
+  public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K
+
+  /**
+   * Default maximum read size in bytes during vectored reads : {@value}.
+   */
+  public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index b18090f3dfe..3558ee5d2d1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -306,6 +306,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    * {@code openFile()}.
    */
   private S3AInputPolicy inputPolicy;
+  /** Vectored IO context. */
+  private VectoredIOContext vectoredIOContext;
+
+  private long readAhead;
   private ChangeDetectionPolicy changeDetectionPolicy;
   private final AtomicBoolean closed = new AtomicBoolean(false);
   private volatile boolean isClosed = false;
@@ -577,6 +581,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
                         DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
           inputPolicy);
+      vectoredIOContext = populateVectoredIOContext(conf);
     } catch (AmazonClientException e) {
       // amazon client exception: stop all services then throw the translation
       cleanupWithLogger(LOG, span);
@@ -590,6 +595,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
   }
 
+  /**
+   * Populates the configurations related to vectored IO operation
+   * in the context which has to passed down to input streams.
+   * @param conf configuration object.
+   * @return VectoredIOContext.
+   */
+  private VectoredIOContext populateVectoredIOContext(Configuration conf) {
+    final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
+            DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
+    final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+            DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
+    return new VectoredIOContext()
+            .setMinSeekForVectoredReads(minSeekVectored)
+            .setMaxReadSizeForVectoredReads(maxReadSizeVectored)
+            .build();
+  }
+
   /**
    * Set the client side encryption gauge to 0 or 1, indicating if CSE is
    * enabled through the gauge or not.
@@ -1532,7 +1554,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         invoker,
         statistics,
         statisticsContext,
-        fileStatus)
+        fileStatus,
+        vectoredIOContext)
         .withAuditSpan(auditSpan);
     openFileHelper.applyDefaultOptions(roc);
     return roc.build();
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 05d9c7f9fe9..23f31df1645 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -145,6 +145,9 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   private S3AInputPolicy inputPolicy;
   private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
 
+  /** Vectored IO context. */
+  private final VectoredIOContext vectoredIOContext;
+
   /**
    * This is the actual position within the object, used by
    * lazy seek to decide whether to seek on the next read or not.
@@ -212,6 +215,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     setReadahead(ctx.getReadahead());
     this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
     this.unboundedThreadPool = unboundedThreadPool;
+    this.vectoredIOContext = context.getVectoredIOContext();
   }
 
   /**
@@ -859,6 +863,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       sb.append(" remainingInCurrentRequest=")
           .append(remainingInCurrentRequest());
       sb.append(" ").append(changeTracker);
+      sb.append(" ").append(vectoredIOContext);
       sb.append('\n').append(s);
       sb.append('}');
       return sb.toString();
@@ -905,6 +910,22 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     }
   }
 
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int minSeekForVectorReads() {
+    return vectoredIOContext.getMinSeekForVectorReads();
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int maxReadSizeForVectorReads() {
+    return vectoredIOContext.getMaxReadSizeForVectorReads();
+  }
+
   /**
    * {@inheritDoc}
    * Vectored read implementation for S3AInputStream.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index f416cf9485d..29e3df1af12 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -64,6 +64,12 @@ public class S3AReadOpContext extends S3AOpContext {
    */
   private long asyncDrainThreshold;
 
+  /**
+   * Vectored IO context for vectored read api
+   * in {@code S3AInputStream#readVectored(List, IntFunction)}.
+   */
+  private final VectoredIOContext vectoredIOContext;
+
   /**
    * Instantiate.
    * @param path path of read
@@ -71,17 +77,19 @@ public class S3AReadOpContext extends S3AOpContext {
    * @param stats Fileystem statistics (may be null)
    * @param instrumentation statistics context
    * @param dstFileStatus target file status
+   * @param vectoredIOContext context for vectored read operation.
    */
   public S3AReadOpContext(
       final Path path,
       Invoker invoker,
       @Nullable FileSystem.Statistics stats,
       S3AStatisticsContext instrumentation,
-      FileStatus dstFileStatus) {
-
+      FileStatus dstFileStatus,
+      VectoredIOContext vectoredIOContext) {
     super(invoker, stats, instrumentation,
         dstFileStatus);
     this.path = requireNonNull(path);
+    this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
   }
 
   /**
@@ -145,6 +153,7 @@ public class S3AReadOpContext extends S3AOpContext {
   }
 
   /**
+<<<<<<< HEAD
    * Set builder value.
    * @param value new value
    * @return the builder
@@ -199,6 +208,14 @@ public class S3AReadOpContext extends S3AOpContext {
     return asyncDrainThreshold;
   }
 
+  /**
+   * Get Vectored IO context for this this read op.
+   * @return vectored IO context.
+   */
+  public VectoredIOContext getVectoredIOContext() {
+    return vectoredIOContext;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java
new file mode 100644
index 00000000000..31f0ae4cb55
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.util.List;
+import java.util.function.IntFunction;
+
+/**
+ * Context related to vectored IO operation.
+ * See {@link S3AInputStream#readVectored(List, IntFunction)}.
+ */
+public class VectoredIOContext {
+
+  /**
+   * What is the smallest reasonable seek that we should group
+   * ranges together during vectored read operation.
+   */
+  private int minSeekForVectorReads;
+
+  /**
+   * What is the largest size that we should group ranges
+   * together during vectored read operation.
+   * Setting this value 0 will disable merging of ranges.
+   */
+  private int maxReadSizeForVectorReads;
+
+  /**
+   * Default no arg constructor.
+   */
+  public VectoredIOContext() {
+  }
+
+  public VectoredIOContext setMinSeekForVectoredReads(int minSeek) {
+    this.minSeekForVectorReads = minSeek;
+    return this;
+  }
+
+  public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) {
+    this.maxReadSizeForVectorReads = maxSize;
+    return this;
+  }
+
+  public VectoredIOContext build() {
+    return this;
+  }
+
+  public int getMinSeekForVectorReads() {
+    return minSeekForVectorReads;
+  }
+
+  public int getMaxReadSizeForVectorReads() {
+    return maxReadSizeForVectorReads;
+  }
+
+  @Override
+  public String toString() {
+    return "VectoredIOContext{" +
+            "minSeekForVectorReads=" + minSeekForVectorReads +
+            ", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
+            '}';
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
index f398c4cbcbe..06eb137cd9b 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
@@ -55,6 +55,36 @@ it isn't, and some attempts to preserve the metaphor are "aggressively suboptima
 
 To make most efficient use of S3, care is needed.
 
+## <a name="vectoredIO"></a> Improving read performance using Vectored IO
+The S3A FileSystem supports implementation of vectored read api using which
+a client can provide a list of file ranges to read returning a future read
+object associated with each range. For full api specification please see
+[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html).
+
+The following properties can be configured to optimise vectored reads based
+on the client requirements.
+
+```xml
+<property>
+  <name>fs.s3a.vectored.read.min.seek.size</name>
+  <value>4K</value>
+  <description>
+     What is the smallest reasonable seek in bytes such
+     that we group ranges together during vectored
+     read operation.
+   </description>
+</property>
+<property>
+<name>fs.s3a.vectored.read.max.merged.size</name>
+<value>1M</value>
+<description>
+   What is the largest merged read size in bytes such
+   that we group ranges together during vectored read.
+   Setting this value to 0 will disable merging of ranges.
+</description>
+</property>
+```
+
 ## <a name="fadvise"></a> Improving data input performance through fadvise
 
 The S3A Filesystem client supports the notion of input policies, similar
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 255cc6501c2..0752e75d247 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -18,15 +18,23 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.FileRangeImpl;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
-import java.util.ArrayList;
-import java.util.List;
+import static org.apache.hadoop.test.MoreAsserts.assertEqual;
 
 public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
 
@@ -42,7 +50,6 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
   /**
    * Overriding in S3 vectored read api fails fast in case of EOF
    * requested range.
-   * @throws Exception
    */
   @Override
   public void testEOFRanges() throws Exception {
@@ -51,4 +58,45 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
     fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
     testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
   }
+
+  @Test
+  public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception {
+    Configuration conf = getFileSystem().getConf();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+            Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+            Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
+    S3ATestUtils.disableFilesystemCaching(conf);
+    final int configuredMinSeek = 2 * 1024;
+    final int configuredMaxSize = 10 * 1024 * 1024;
+    conf.set(Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K");
+    conf.set(Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M");
+    try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+      try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
+        int newMinSeek = fis.minSeekForVectorReads();
+        int newMaxSize = fis.maxReadSizeForVectorReads();
+        assertEqual(newMinSeek, configuredMinSeek,
+                "configured s3a min seek for vectored reads");
+        assertEqual(newMaxSize, configuredMaxSize,
+                "configured s3a max size for vectored reads");
+      }
+    }
+  }
+
+  @Test
+  public void testMinSeekAndMaxSizeDefaultValues() throws Exception {
+    Configuration conf = getFileSystem().getConf();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+            Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
+            Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE);
+    try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+      try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
+        int minSeek = fis.minSeekForVectorReads();
+        int maxSize = fis.maxReadSizeForVectorReads();
+        assertEqual(minSeek, Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
+                "default s3a min seek for vectored reads");
+        assertEqual(maxSize, Constants.DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+                "default s3a max read size for vectored reads");
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org