You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mt...@apache.org on 2022/07/28 16:27:52 UTC

[hadoop] branch trunk updated: HADOOP-18227. Add input stream IOStats for vectored IO api in S3A. (#4636)

This is an automated email from the ASF dual-hosted git repository.

mthakur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a5b12c80104 HADOOP-18227. Add input stream IOStats for vectored IO api in S3A. (#4636)
a5b12c80104 is described below

commit a5b12c8010447901d269309633f500287cf0d636
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Thu Jul 28 21:57:37 2022 +0530

    HADOOP-18227. Add input stream IOStats for vectored IO api in S3A. (#4636)
    
    part of HADOOP-18103.
    
    Contributed By: Mukund Thakur
---
 .../hadoop/fs/statistics/StreamStatisticNames.java |  30 +++-
 .../contract/AbstractContractVectoredReadTest.java |  11 ++
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   |   6 +-
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |  29 ++++
 .../s3a/statistics/S3AInputStreamStatistics.java   |  14 ++
 .../statistics/impl/EmptyS3AStatisticsContext.java |  11 ++
 .../contract/s3a/ITestS3AContractVectoredRead.java | 171 +++++++++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  31 ++++
 .../s3a/scale/ITestS3AInputStreamPerformance.java  |   2 +
 .../hadoop/fs/s3a/scale/S3AScaleTestBase.java      |  33 ----
 10 files changed, 303 insertions(+), 35 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index ca755f08419..bb697ad8ccf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -47,7 +47,7 @@ public final class StreamStatisticNames {
   public static final String STREAM_READ_ABORTED = "stream_aborted";
 
   /**
-   * Bytes read from an input stream in read() calls.
+   * Bytes read from an input stream in read()/readVectored() calls.
    * Does not include bytes read and then discarded in seek/close etc.
    * These are the bytes returned to the caller.
    * Value: {@value}.
@@ -110,6 +110,34 @@ public final class StreamStatisticNames {
   public static final String STREAM_READ_OPERATIONS =
       "stream_read_operations";
 
+  /**
+   * Count of readVectored() operations in an input stream.
+   * Value: {@value}.
+   */
+  public static final String STREAM_READ_VECTORED_OPERATIONS =
+          "stream_read_vectored_operations";
+
+  /**
+   * Count of bytes discarded during readVectored() operation
+   * in an input stream.
+   * Value: {@value}.
+   */
+  public static final String STREAM_READ_VECTORED_READ_BYTES_DISCARDED =
+          "stream_read_vectored_read_bytes_discarded";
+
+  /**
+   * Count of incoming file ranges during readVectored() operation.
+   * Value: {@value}
+   */
+  public static final String STREAM_READ_VECTORED_INCOMING_RANGES =
+          "stream_read_vectored_incoming_ranges";
+  /**
+   * Count of combined file ranges during readVectored() operation.
+   * Value: {@value}
+   */
+  public static final String STREAM_READ_VECTORED_COMBINED_RANGES =
+          "stream_read_vectored_combined_ranges";
+
   /**
    * Count of incomplete read() operations in an input stream,
    * that is, when the bytes returned were less than that requested.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index 77bcc496ff4..379b992fba1 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -84,6 +84,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     return allocate;
   }
 
+  public WeakReferencedElasticByteBufferPool getPool() {
+    return pool;
+  }
+
   @Override
   public void setup() throws Exception {
     super.setup();
@@ -382,6 +386,13 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
     return fileRanges;
   }
 
+  protected List<FileRange> getConsecutiveRanges() {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(100, 500));
+    fileRanges.add(FileRange.createFileRange(600, 500));
+    return fileRanges;
+  }
+
   /**
    * Validate that exceptions must be thrown during a vectored
    * read operation with specific input ranges.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 178a807733a..c20c3a04863 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -963,7 +963,6 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   @Override
   public void readVectored(List<? extends FileRange> ranges,
                            IntFunction<ByteBuffer> allocate) throws IOException {
-
     LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
     checkNotClosed();
     if (stopVectoredIOOperations.getAndSet(false)) {
@@ -978,6 +977,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
 
     if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
       LOG.debug("Not merging the ranges as they are disjoint");
+      streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
       for (FileRange range: sortedRanges) {
         ByteBuffer buffer = allocate.apply(range.getLength());
         unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
@@ -987,6 +987,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges,
               1, minSeekForVectorReads(),
               maxReadSizeForVectorReads());
+      streamStatistics.readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size());
       LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
               ranges.size(), combinedFileRanges.size());
       for (CombinedFileRange combinedFileRange: combinedFileRanges) {
@@ -1088,6 +1089,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
       }
       drainBytes += readCount;
     }
+    streamStatistics.readVectoredBytesDiscarded(drainBytes);
     LOG.debug("{} bytes drained from stream ", drainBytes);
   }
 
@@ -1168,6 +1170,8 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     } else {
       readByteArray(objectContent, buffer.array(), 0, length);
     }
+    // update io stats.
+    incrementBytesRead(length);
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 67734b75029..0a49f564616 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -803,6 +803,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
     private final AtomicLong readOperations;
     private final AtomicLong readFullyOperations;
     private final AtomicLong seekOperations;
+    private final AtomicLong readVectoredOperations;
+    private final AtomicLong bytesDiscardedInVectoredIO;
+    private final AtomicLong readVectoredIncomingRanges;
+    private final AtomicLong readVectoredCombinedRanges;
 
     /** Bytes read by the application and any when draining streams . */
     private final AtomicLong totalBytesRead;
@@ -836,6 +840,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
               StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
               StreamStatisticNames.STREAM_READ_UNBUFFERED,
+              StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+              StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+              StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
               StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
           .withGauges(STREAM_READ_GAUGE_INPUT_POLICY)
           .withDurationTracking(ACTION_HTTP_GET_REQUEST,
@@ -872,6 +880,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
           StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE);
       readOperations = st.getCounterReference(
           StreamStatisticNames.STREAM_READ_OPERATIONS);
+      readVectoredOperations = st.getCounterReference(
+          StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS);
+      bytesDiscardedInVectoredIO =  st.getCounterReference(
+              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED);
+      readVectoredIncomingRanges = st.getCounterReference(
+              StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES);
+      readVectoredCombinedRanges = st.getCounterReference(
+              StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES);
       readFullyOperations = st.getCounterReference(
           StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS);
       seekOperations = st.getCounterReference(
@@ -1017,6 +1033,19 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
       }
     }
 
+    @Override
+    public void readVectoredOperationStarted(int numIncomingRanges,
+                                             int numCombinedRanges) {
+      readVectoredIncomingRanges.addAndGet(numIncomingRanges);
+      readVectoredCombinedRanges.addAndGet(numCombinedRanges);
+      readVectoredOperations.incrementAndGet();
+    }
+
+    @Override
+    public void readVectoredBytesDiscarded(int discarded) {
+      bytesDiscardedInVectoredIO.addAndGet(discarded);
+    }
+
     /**
      * {@code close()} merges the stream statistics into the filesystem's
      * instrumentation instance.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 539af2bde36..41a8f253159 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -96,6 +96,20 @@ public interface S3AInputStreamStatistics extends AutoCloseable,
    */
   void readOperationCompleted(int requested, int actual);
 
+  /**
+   * A vectored read operation has started..
+   * @param numIncomingRanges number of input ranges.
+   * @param numCombinedRanges number of combined ranges.
+   */
+  void readVectoredOperationStarted(int numIncomingRanges,
+                                    int numCombinedRanges);
+
+  /**
+   * Number of bytes discarded during vectored read.
+   * @param discarded discarded bytes during vectored read.
+   */
+  void readVectoredBytesDiscarded(int discarded);
+
   @Override
   void close();
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index 5c0995e41b3..cea8be7f10e 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -195,6 +195,17 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
 
     }
 
+    @Override
+    public void readVectoredOperationStarted(int numIncomingRanges,
+                                             int numCombinedRanges) {
+
+    }
+
+    @Override
+    public void readVectoredBytesDiscarded(int discarded) {
+
+    }
+
     @Override
     public void close() {
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 18a727dcdce..84a90ba441a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -19,28 +19,41 @@
 package org.apache.hadoop.fs.contract.s3a;
 
 import java.io.EOFException;
+import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StoreStatisticNames;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 import org.apache.hadoop.test.LambdaTestUtils;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static org.apache.hadoop.test.MoreAsserts.assertEqual;
 
 public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractVectoredRead.class);
+
   public ITestS3AContractVectoredRead(String bufferType) {
     super(bufferType);
   }
@@ -156,4 +169,162 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe
     List<FileRange> fileRanges = getSampleSameRanges();
     verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class);
   }
+
+  /**
+   * As the minimum seek value is 4*1024, the first three ranges will be
+   * merged into and other two will remain as it is.
+   * */
+  @Test
+  public void testNormalReadVsVectoredReadStatsCollection() throws Exception {
+    FileSystem fs = getTestFileSystemWithReadAheadDisabled();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+    FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+    CompletableFuture<FSDataInputStream> builder =
+            fs.openFile(path(VECTORED_READ_FILE_NAME))
+                    .withFileStatus(fileStatus)
+                    .build();
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(fileRanges, getAllocate());
+      validateVectoredReadResult(fileRanges, DATASET);
+      returnBuffersToPoolPostRead(fileRanges, getPool());
+
+      // audit the io statistics for this stream
+      IOStatistics st = in.getIOStatistics();
+      LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st));
+
+      // the vectored io operation must be tracked
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+              1);
+
+      // the vectored io operation is being called with 5 input ranges.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+              5);
+
+      // 5 input ranges got combined in 3 as some of them are close.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+              3);
+
+      // number of bytes discarded will be based on the above input ranges.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+              5944);
+
+      verifyStatisticCounterValue(st,
+              StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+              3);
+
+      // read bytes should match the sum of requested length for each input ranges.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_BYTES,
+              1424);
+
+    }
+
+    CompletableFuture<FSDataInputStream> builder1 =
+            fs.openFile(path(VECTORED_READ_FILE_NAME))
+                    .withFileStatus(fileStatus)
+                    .build();
+
+    try (FSDataInputStream in = builder1.get()) {
+      for (FileRange range : fileRanges) {
+        byte[] temp = new byte[range.getLength()];
+        in.readFully((int) range.getOffset(), temp, 0, range.getLength());
+      }
+
+      // audit the statistics for this stream
+      IOStatistics st = in.getIOStatistics();
+      LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st));
+
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+              0);
+
+      // all other counter values consistent.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+              0);
+      verifyStatisticCounterValue(st,
+              StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+              5);
+
+      // read bytes should match the sum of requested length for each input ranges.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_BYTES,
+              1424);
+    }
+  }
+
+  @Test
+  public void testMultiVectoredReadStatsCollection() throws Exception {
+    FileSystem fs = getTestFileSystemWithReadAheadDisabled();
+    List<FileRange> ranges1 = getConsecutiveRanges();
+    List<FileRange> ranges2 = getConsecutiveRanges();
+    FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME));
+    CompletableFuture<FSDataInputStream> builder =
+            fs.openFile(path(VECTORED_READ_FILE_NAME))
+                    .withFileStatus(fileStatus)
+                    .build();
+    try (FSDataInputStream in = builder.get()) {
+      in.readVectored(ranges1, getAllocate());
+      in.readVectored(ranges2, getAllocate());
+      validateVectoredReadResult(ranges1, DATASET);
+      validateVectoredReadResult(ranges2, DATASET);
+      returnBuffersToPoolPostRead(ranges1, getPool());
+      returnBuffersToPoolPostRead(ranges2, getPool());
+
+      // audit the io statistics for this stream
+      IOStatistics st = in.getIOStatistics();
+
+      // 2 vectored io calls are made above.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+              2);
+
+      // 2 vectored io operation is being called with 2 input ranges.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
+              4);
+
+      // 2 ranges are getting merged in 1 during both vectored io operation.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES,
+              2);
+
+      // number of bytes discarded will be 0 as the ranges are consecutive.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
+              0);
+      // only 2 http get request will be made because ranges in both range list will be merged
+      // to 1 because they are consecutive.
+      verifyStatisticCounterValue(st,
+              StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
+              2);
+      // read bytes should match the sum of requested length for each input ranges.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_BYTES,
+              2000);
+    }
+  }
+
+  private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException {
+    Configuration conf = getFileSystem().getConf();
+    // also resetting the min seek and max size values is important
+    // as this same test suite has test which overrides these params.
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+            Constants.READAHEAD_RANGE,
+            Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
+            Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE);
+    S3ATestUtils.disableFilesystemCaching(conf);
+    conf.setInt(Constants.READAHEAD_RANGE, 0);
+    return S3ATestUtils.createTestFileSystem(conf);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 48cb52c5ac2..6162ed13123 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -69,6 +70,7 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.text.DateFormat;
@@ -1457,4 +1459,33 @@ public final class S3ATestUtils {
           + " in " + secrets);
     }
   }
+
+
+  /**
+   * Get the input stream statistics of an input stream.
+   * Raises an exception if the inner stream is not an S3A input stream
+   * @param in wrapper
+   * @return the statistics for the inner stream
+   */
+  public static S3AInputStreamStatistics getInputStreamStatistics(
+          FSDataInputStream in) {
+    return getS3AInputStream(in).getS3AStreamStatistics();
+  }
+
+  /**
+   * Get the inner stream of an input stream.
+   * Raises an exception if the inner stream is not an S3A input stream
+   * @param in wrapper
+   * @return the inner stream
+   * @throws AssertionError if the inner stream is of the wrong type
+   */
+  public static S3AInputStream getS3AInputStream(
+          FSDataInputStream in) {
+    InputStream inner = in.getWrappedStream();
+    if (inner instanceof S3AInputStream) {
+      return (S3AInputStream) inner;
+    } else {
+      throw new AssertionError("Not an S3AInputStream: " + inner);
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index d73a938bcce..b8195cb9964 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -56,6 +56,8 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_RE
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getInputStreamStatistics;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMinimum;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
index d95b46b10d7..514c6cf8869 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
@@ -19,19 +19,14 @@
 package org.apache.hadoop.fs.s3a.scale;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
-import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3ATestConstants;
 import org.apache.hadoop.fs.s3a.Statistic;
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.InputStream;
-
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic;
 
@@ -154,34 +149,6 @@ public class S3AScaleTestBase extends AbstractS3ATestBase {
     return getTestTimeoutSeconds() * 1000;
   }
 
-  /**
-   * Get the input stream statistics of an input stream.
-   * Raises an exception if the inner stream is not an S3A input stream
-   * @param in wrapper
-   * @return the statistics for the inner stream
-   */
-  protected S3AInputStreamStatistics getInputStreamStatistics(
-      FSDataInputStream in) {
-    return getS3AInputStream(in).getS3AStreamStatistics();
-  }
-
-  /**
-   * Get the inner stream of an input stream.
-   * Raises an exception if the inner stream is not an S3A input stream
-   * @param in wrapper
-   * @return the inner stream
-   * @throws AssertionError if the inner stream is of the wrong type
-   */
-  protected S3AInputStream getS3AInputStream(
-      FSDataInputStream in) {
-    InputStream inner = in.getWrappedStream();
-    if (inner instanceof S3AInputStream) {
-      return (S3AInputStream) inner;
-    } else {
-      throw new AssertionError("Not an S3AInputStream: " + inner);
-    }
-  }
-
   /**
    * Get the gauge value of a statistic from the
    * IOStatistics of the filesystem. Raises an assertion if


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org