You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mt...@apache.org on 2022/04/29 22:47:53 UTC

[hadoop] branch feature-vectored-io updated: HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)

This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch feature-vectored-io
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/feature-vectored-io by this push:
     new d441c0f64f9 HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)
d441c0f64f9 is described below

commit d441c0f64f90bf87a04ed094db9d2eda4a5b8bef
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Sat Apr 30 04:17:33 2022 +0530

    HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads (#3964)
    
    * HADOOP-18104: S3A: Add configs to configure minSeekForVectorReads and maxReadSizeForVectorReads
    
    Part of HADOOP-18103.
    Introducing fs.s3a.vectored.read.min.seek.size and fs.s3a.vectored.read.max.merged.size
    to configure min seek and max read during a vectored IO operation in S3A connector.
    These properties actually define how the ranges will be merged. To completely
    disable merging set fs.s3a.max.readsize.vectored.read to 0.
    
    Contributed By: Mukund Thakur
---
 .../site/markdown/filesystem/fsdatainputstream.md  |  1 +
 .../contract/AbstractContractVectoredReadTest.java | 21 +++---
 .../hadoop/fs/impl/TestVectoredReadUtils.java      | 24 +++++++
 .../java/org/apache/hadoop/test/MoreAsserts.java   | 12 ++++
 .../java/org/apache/hadoop/fs/s3a/Constants.java   | 27 ++++++++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 24 ++++++-
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   | 21 ++++++
 .../org/apache/hadoop/fs/s3a/S3AReadOpContext.java | 35 +++++++---
 .../apache/hadoop/fs/s3a/VectoredIOContext.java    | 78 ++++++++++++++++++++++
 .../site/markdown/tools/hadoop-aws/performance.md  | 30 +++++++++
 .../contract/s3a/ITestS3AContractVectoredRead.java | 54 ++++++++++++++-
 11 files changed, 304 insertions(+), 23 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
index 0fe1772d266..e4a2830967e 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
@@ -474,6 +474,7 @@ end of first and start of next range is more than this value.
 
 Maximum number of bytes which can be read in one go after merging the ranges.
 Two ranges won't be merged if the combined data to be read is more than this value.
+Essentially setting this to 0 will disable the merging of ranges.
 
 ## Consistency
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index eee4b11e739..756c3de85cc 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -18,15 +18,6 @@
 
 package org.apache.hadoop.fs.contract;
 
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileRange;
-import org.apache.hadoop.fs.FileRangeImpl;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.impl.FutureIOSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -42,6 +33,15 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileRangeImpl;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.FutureIOSupport;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 
@@ -52,7 +52,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
 
   public static final int DATASET_LEN = 64 * 1024;
   private static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
-  private static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
+  protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
   private static final String VECTORED_READ_FILE_1MB_NAME = "vectored_file_1M.txt";
   private static final byte[] DATASET_MB = ContractTestUtils.dataset(1024 * 1024, 'a', 256);
 
@@ -172,6 +172,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     }
   }
 
+  @Test
   public void testSameRanges() throws Exception {
     FileSystem fs = getFileSystem();
     List<FileRange> fileRanges = new ArrayList<>();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
index f789f361905..cfd366701be 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/TestVectoredReadUtils.java
@@ -207,6 +207,30 @@ public class TestVectoredReadUtils extends HadoopTestBase {
             VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
 
   }
+
+  @Test
+  public void testMaxSizeZeroDisablesMering() throws Exception {
+    List<FileRange> randomRanges = Arrays.asList(
+            new FileRangeImpl(3000, 110),
+            new FileRangeImpl(3000, 100),
+            new FileRangeImpl(2100, 100)
+    );
+    assertEqualRangeCountsAfterMerging(randomRanges, 1, 1, 0);
+    assertEqualRangeCountsAfterMerging(randomRanges, 1, 0, 0);
+    assertEqualRangeCountsAfterMerging(randomRanges, 1, 100, 0);
+  }
+
+  private void assertEqualRangeCountsAfterMerging(List<FileRange> inputRanges,
+                                                  int chunkSize,
+                                                  int minimumSeek,
+                                                  int maxSize) {
+    List<CombinedFileRange> combinedFileRanges = VectoredReadUtils
+            .sortAndMergeRanges(inputRanges, chunkSize, minimumSeek, maxSize);
+    Assertions.assertThat(combinedFileRanges)
+            .describedAs("Mismatch in number of ranges post merging")
+            .hasSize(inputRanges.size());
+  }
+
   interface Stream extends PositionedReadable, ByteBufferPositionedReadable {
     // nothing
   }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
index f83ef9e63d1..f6e6055d78e 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MoreAsserts.java
@@ -86,4 +86,16 @@ public class MoreAsserts {
                     "completed exceptionally")
             .isTrue();
   }
+
+  /**
+   * Assert two same type of values.
+   * @param actual actual value.
+   * @param expected expected value.
+   * @param message error message to print in case of mismatch.
+   */
+  public static <T> void assertEqual(T actual, T expected, String message) {
+    Assertions.assertThat(actual)
+            .describedAs("Mismatch in %s", message)
+            .isEqualTo(expected);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index dd7e4258809..8dc7bef8f09 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1066,4 +1066,31 @@ public final class Constants {
    * Require that all S3 access is made through Access Points.
    */
   public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";
+
+  /**
+   * What is the smallest reasonable seek in bytes such
+   * that we group ranges together during vectored read operation.
+   * Value : {@value}.
+   */
+  public static final String AWS_S3_VECTOR_READS_MIN_SEEK_SIZE =
+          "fs.s3a.vectored.read.min.seek.size";
+
+  /**
+   * What is the largest merged read size in bytes such
+   * that we group ranges together during vectored read.
+   * Setting this value to 0 will disable merging of ranges.
+   * Value : {@value}.
+   */
+  public static final String AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE =
+          "fs.s3a.vectored.read.max.merged.size";
+
+  /**
+   * Default minimum seek in bytes during vectored reads : {@value}.
+   */
+  public static final int DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE = 4896; // 4K
+
+  /**
+   * Default maximum read size in bytes during vectored reads : {@value}.
+   */
+  public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index f37577851bc..c80fb71576d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -296,6 +296,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   /** Storage Statistics Bonded to the instrumentation. */
   private S3AStorageStatistics storageStatistics;
 
+  /** Vectored IO context. */
+  private VectoredIOContext vectoredIOContext;
+
   private long readAhead;
   private S3AInputPolicy inputPolicy;
   private ChangeDetectionPolicy changeDetectionPolicy;
@@ -551,6 +554,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
           BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
       listing = new Listing(listingOperationCallbacks, createStoreContext());
+      vectoredIOContext = populateVectoredIOContext(conf);
     } catch (AmazonClientException e) {
       // amazon client exception: stop all services then throw the translation
       cleanupWithLogger(LOG, span);
@@ -564,6 +568,23 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
   }
 
+  /**
+   * Populates the configurations related to vectored IO operation
+   * in the context which has to passed down to input streams.
+   * @param conf configuration object.
+   * @return VectoredIOContext.
+   */
+  private VectoredIOContext populateVectoredIOContext(Configuration conf) {
+    final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
+            DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
+    final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+            DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
+    return new VectoredIOContext()
+            .setMinSeekForVectoredReads(minSeekVectored)
+            .setMaxReadSizeForVectoredReads(maxReadSizeVectored)
+            .build();
+  }
+
   /**
    * Set the client side encryption gauge to 0 or 1, indicating if CSE is
    * enabled through the gauge or not.
@@ -1526,7 +1547,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         seekPolicy,
         changePolicy,
         readAheadRange,
-        auditSpan);
+        auditSpan,
+        vectoredIOContext);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 73a13cde713..257a971ded5 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -137,6 +137,9 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   private S3AInputPolicy inputPolicy;
   private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
 
+  /** Vectored IO context. */
+  private final VectoredIOContext vectoredIOContext;
+
   /**
    * This is the actual position within the object, used by
    * lazy seek to decide whether to seek on the next read or not.
@@ -197,6 +200,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     setInputPolicy(ctx.getInputPolicy());
     setReadahead(ctx.getReadahead());
     this.unboundedThreadPool = unboundedThreadPool;
+    this.vectoredIOContext = context.getVectoredIOContext();
   }
 
   /**
@@ -764,6 +768,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       sb.append(" remainingInCurrentRequest=")
           .append(remainingInCurrentRequest());
       sb.append(" ").append(changeTracker);
+      sb.append(" ").append(vectoredIOContext);
       sb.append('\n').append(s);
       sb.append('}');
       return sb.toString();
@@ -810,6 +815,22 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     }
   }
 
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int minSeekForVectorReads() {
+    return vectoredIOContext.getMinSeekForVectorReads();
+  }
+
+  /**
+   * {@inheritDoc}.
+   */
+  @Override
+  public int maxReadSizeForVectorReads() {
+    return vectoredIOContext.getMaxReadSizeForVectorReads();
+  }
+
   /**
    * {@inheritDoc}
    * Vectored read implementation for S3AInputStream.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index 5ce3d96c435..22d7284d612 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -58,6 +58,12 @@ public class S3AReadOpContext extends S3AOpContext {
 
   private final AuditSpan auditSpan;
 
+  /**
+   * Vectored IO context for vectored read api
+   * in {@code S3AInputStream#readVectored(List, IntFunction)}.
+   */
+  private final VectoredIOContext vectoredIOContext;
+
   /**
    * Instantiate.
    * @param path path of read
@@ -69,17 +75,19 @@ public class S3AReadOpContext extends S3AOpContext {
    * @param changeDetectionPolicy change detection policy.
    * @param readahead readahead for GET operations/skip, etc.
    * @param auditSpan active audit
+   * @param vectoredIOContext context for vectored read operation.
    */
   public S3AReadOpContext(
-      final Path path,
-      Invoker invoker,
-      @Nullable FileSystem.Statistics stats,
-      S3AStatisticsContext instrumentation,
-      FileStatus dstFileStatus,
-      S3AInputPolicy inputPolicy,
-      ChangeDetectionPolicy changeDetectionPolicy,
-      final long readahead,
-      final AuditSpan auditSpan) {
+        final Path path,
+        Invoker invoker,
+        @Nullable FileSystem.Statistics stats,
+        S3AStatisticsContext instrumentation,
+        FileStatus dstFileStatus,
+        S3AInputPolicy inputPolicy,
+        ChangeDetectionPolicy changeDetectionPolicy,
+        final long readahead,
+        final AuditSpan auditSpan,
+        VectoredIOContext vectoredIOContext) {
 
     super(invoker, stats, instrumentation,
         dstFileStatus);
@@ -90,6 +98,7 @@ public class S3AReadOpContext extends S3AOpContext {
     this.inputPolicy = checkNotNull(inputPolicy);
     this.changeDetectionPolicy = checkNotNull(changeDetectionPolicy);
     this.readahead = readahead;
+    this.vectoredIOContext = checkNotNull(vectoredIOContext);
   }
 
   /**
@@ -136,6 +145,14 @@ public class S3AReadOpContext extends S3AOpContext {
     return auditSpan;
   }
 
+  /**
+   * Get Vectored IO context for this this read op.
+   * @return vectored IO context.
+   */
+  public VectoredIOContext getVectoredIOContext() {
+    return vectoredIOContext;
+  }
+
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java
new file mode 100644
index 00000000000..31f0ae4cb55
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/VectoredIOContext.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.util.List;
+import java.util.function.IntFunction;
+
+/**
+ * Context related to vectored IO operation.
+ * See {@link S3AInputStream#readVectored(List, IntFunction)}.
+ */
+public class VectoredIOContext {
+
+  /**
+   * What is the smallest reasonable seek that we should group
+   * ranges together during vectored read operation.
+   */
+  private int minSeekForVectorReads;
+
+  /**
+   * What is the largest size that we should group ranges
+   * together during vectored read operation.
+   * Setting this value 0 will disable merging of ranges.
+   */
+  private int maxReadSizeForVectorReads;
+
+  /**
+   * Default no arg constructor.
+   */
+  public VectoredIOContext() {
+  }
+
+  public VectoredIOContext setMinSeekForVectoredReads(int minSeek) {
+    this.minSeekForVectorReads = minSeek;
+    return this;
+  }
+
+  public VectoredIOContext setMaxReadSizeForVectoredReads(int maxSize) {
+    this.maxReadSizeForVectorReads = maxSize;
+    return this;
+  }
+
+  public VectoredIOContext build() {
+    return this;
+  }
+
+  public int getMinSeekForVectorReads() {
+    return minSeekForVectorReads;
+  }
+
+  public int getMaxReadSizeForVectorReads() {
+    return maxReadSizeForVectorReads;
+  }
+
+  @Override
+  public String toString() {
+    return "VectoredIOContext{" +
+            "minSeekForVectorReads=" + minSeekForVectorReads +
+            ", maxReadSizeForVectorReads=" + maxReadSizeForVectorReads +
+            '}';
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
index f398c4cbcbe..06eb137cd9b 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md
@@ -55,6 +55,36 @@ it isn't, and some attempts to preserve the metaphor are "aggressively suboptima
 
 To make most efficient use of S3, care is needed.
 
+## <a name="vectoredIO"></a> Improving read performance using Vectored IO
+The S3A FileSystem supports implementation of vectored read api using which
+a client can provide a list of file ranges to read returning a future read
+object associated with each range. For full api specification please see
+[FSDataInputStream](../../hadoop-common-project/hadoop-common/filesystem/fsdatainputstream.html).
+
+The following properties can be configured to optimise vectored reads based
+on the client requirements.
+
+```xml
+<property>
+  <name>fs.s3a.vectored.read.min.seek.size</name>
+  <value>4K</value>
+  <description>
+     What is the smallest reasonable seek in bytes such
+     that we group ranges together during vectored
+     read operation.
+   </description>
+</property>
+<property>
+<name>fs.s3a.vectored.read.max.merged.size</name>
+<value>1M</value>
+<description>
+   What is the largest merged read size in bytes such
+   that we group ranges together during vectored read.
+   Setting this value to 0 will disable merging of ranges.
+</description>
+</property>
+```
+
 ## <a name="fadvise"></a> Improving data input performance through fadvise
 
 The S3A Filesystem client supports the notion of input policies, similar
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 255cc6501c2..0752e75d247 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -18,15 +18,23 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.FileRangeImpl;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
-import java.util.ArrayList;
-import java.util.List;
+import static org.apache.hadoop.test.MoreAsserts.assertEqual;
 
 public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
 
@@ -42,7 +50,6 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
   /**
    * Overriding in S3 vectored read api fails fast in case of EOF
    * requested range.
-   * @throws Exception
    */
   @Override
   public void testEOFRanges() throws Exception {
@@ -51,4 +58,45 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
     fileRanges.add(new FileRangeImpl(DATASET_LEN, 100));
     testExceptionalVectoredRead(fs, fileRanges, "EOFException is expected");
   }
+
+  @Test
+  public void testMinSeekAndMaxSizeConfigsPropagation() throws Exception {
+    Configuration conf = getFileSystem().getConf();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+            Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+            Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
+    S3ATestUtils.disableFilesystemCaching(conf);
+    final int configuredMinSeek = 2 * 1024;
+    final int configuredMaxSize = 10 * 1024 * 1024;
+    conf.set(Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, "2K");
+    conf.set(Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, "10M");
+    try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+      try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
+        int newMinSeek = fis.minSeekForVectorReads();
+        int newMaxSize = fis.maxReadSizeForVectorReads();
+        assertEqual(newMinSeek, configuredMinSeek,
+                "configured s3a min seek for vectored reads");
+        assertEqual(newMaxSize, configuredMaxSize,
+                "configured s3a max size for vectored reads");
+      }
+    }
+  }
+
+  @Test
+  public void testMinSeekAndMaxSizeDefaultValues() throws Exception {
+    Configuration conf = getFileSystem().getConf();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+            Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
+            Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE);
+    try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+      try (FSDataInputStream fis = fs.open(path(VECTORED_READ_FILE_NAME))) {
+        int minSeek = fis.minSeekForVectorReads();
+        int maxSize = fis.maxReadSizeForVectorReads();
+        assertEqual(minSeek, Constants.DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
+                "default s3a min seek for vectored reads");
+        assertEqual(maxSize, Constants.DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+                "default s3a max read size for vectored reads");
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org