You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/07/26 11:04:50 UTC

[hadoop] branch feature-HADOOP-18028-s3a-prefetch updated: HADOOP-18190. Collect IOStatistics during S3A prefetching (#4458)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch feature-HADOOP-18028-s3a-prefetch
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/feature-HADOOP-18028-s3a-prefetch by this push:
     new a964fbf42f0 HADOOP-18190. Collect IOStatistics during S3A prefetching (#4458)
a964fbf42f0 is described below

commit a964fbf42f056ccf5a22c9cb0b07f241a3854f92
Author: ahmarsuhail <ah...@gmail.com>
AuthorDate: Tue Jul 26 12:04:30 2022 +0100

    HADOOP-18190. Collect IOStatistics during S3A prefetching (#4458)
    
    
    This adds iOStatisticsConnection to the S3PrefetchingInputStream class, with
    new statistic names in StreamStatistics.
    
    This stream is not (yet) IOStatisticsContext aware.
    
    
    Contributed by Ahmar Suhail
---
 .../hadoop/fs/statistics/StreamStatisticNames.java | 40 ++++++++++++
 .../org/apache/hadoop/fs/common/BufferPool.java    | 24 +++++--
 .../hadoop/fs/common/CachingBlockManager.java      | 64 ++++++++++++++----
 .../hadoop/fs/common/PrefetchingStatistics.java    | 67 +++++++++++++++++++
 .../hadoop/fs/common/SingleFilePerBlockCache.java  | 14 +++-
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   | 61 ++++++++++++++++-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   | 12 ++++
 .../hadoop/fs/s3a/read/S3CachingBlockManager.java  |  7 +-
 .../hadoop/fs/s3a/read/S3CachingInputStream.java   | 15 ++++-
 .../org/apache/hadoop/fs/s3a/read/S3Reader.java    | 17 +++--
 .../s3a/statistics/S3AInputStreamStatistics.java   |  3 +-
 .../statistics/impl/EmptyS3AStatisticsContext.java | 36 ++++++++++
 .../fs/common/EmptyPrefetchingStatistics.java      | 76 ++++++++++++++++++++++
 .../apache/hadoop/fs/common/TestBlockCache.java    | 11 +++-
 .../apache/hadoop/fs/common/TestBufferPool.java    | 23 ++++---
 .../fs/s3a/ITestS3PrefetchingInputStream.java      | 24 ++++++-
 .../java/org/apache/hadoop/fs/s3a/read/Fakes.java  |  5 +-
 .../fs/s3a/read/TestS3CachingBlockManager.java     | 35 ++++++----
 18 files changed, 471 insertions(+), 63 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index ca755f08419..5a55ca522d6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -387,6 +387,46 @@ public final class StreamStatisticNames {
   public static final String BLOCKS_RELEASED
       = "blocks_released";
 
+  /**
+   * Total number of prefetching operations executed.
+   */
+  public static final String STREAM_READ_PREFETCH_OPERATIONS
+      = "stream_read_prefetch_operations";
+
+  /**
+   * Total number of block in disk cache.
+   */
+  public static final String STREAM_READ_BLOCKS_IN_FILE_CACHE
+      = "stream_read_blocks_in_cache";
+
+  /**
+   * Total number of active prefetch operations.
+   */
+  public static final String STREAM_READ_ACTIVE_PREFETCH_OPERATIONS
+      = "stream_read_active_prefetch_operations";
+
+  /**
+   * Total bytes of memory in use by this input stream.
+   */
+  public static final String STREAM_READ_ACTIVE_MEMORY_IN_USE
+      = "stream_read_active_memory_in_use";
+
+  /**
+   * count/duration of reading a remote block.
+   *
+   * Value: {@value}.
+   */
+  public static final String STREAM_READ_REMOTE_BLOCK_READ
+      = "stream_read_block_read";
+
+  /**
+   * count/duration of acquiring a buffer and reading to it.
+   *
+   * Value: {@value}.
+   */
+  public static final String STREAM_READ_BLOCK_ACQUIRE_AND_READ
+      = "stream_read_block_acquire_read";
+
   private StreamStatisticNames() {
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
index b151ed439af..bd7da11ddd8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
@@ -31,6 +31,8 @@ import java.util.concurrent.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Manages a fixed pool of {@code ByteBuffer} instances.
  *
@@ -56,26 +58,32 @@ public class BufferPool implements Closeable {
   // Allows associating metadata to each buffer in the pool.
   private Map<BufferData, ByteBuffer> allocated;
 
+  private PrefetchingStatistics prefetchingStatistics;
+
   /**
    * Initializes a new instance of the {@code BufferPool} class.
    *
    * @param size number of buffer in this pool.
    * @param bufferSize size in bytes of each buffer.
+   * @param prefetchingStatistics statistics for this stream.
    *
    * @throws IllegalArgumentException if size is zero or negative.
    * @throws IllegalArgumentException if bufferSize is zero or negative.
    */
-  public BufferPool(int size, int bufferSize) {
+  public BufferPool(int size, int bufferSize, PrefetchingStatistics prefetchingStatistics) {
     Validate.checkPositiveInteger(size, "size");
     Validate.checkPositiveInteger(bufferSize, "bufferSize");
 
     this.size = size;
     this.bufferSize = bufferSize;
     this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
+    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
     this.pool = new BoundedResourcePool<ByteBuffer>(size) {
         @Override
         public ByteBuffer createNew() {
-          return ByteBuffer.allocate(bufferSize);
+          ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+          prefetchingStatistics.memoryAllocated(bufferSize);
+          return buffer;
         }
       };
   }
@@ -236,11 +244,15 @@ public class BufferPool implements Closeable {
       }
     }
 
-    this.pool.close();
-    this.pool = null;
+    int currentPoolSize = pool.numCreated();
+
+    pool.close();
+    pool = null;
+
+    allocated.clear();
+    allocated = null;
 
-    this.allocated.clear();
-    this.allocated = null;
+    prefetchingStatistics.memoryFreed(currentPoolSize * bufferSize);
   }
 
   // For debugging purposes.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
index 1bb439a9997..1207d3d0318 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.common;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +33,10 @@ import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.fs.statistics.DurationTracker;
+
+import static java.util.Objects.requireNonNull;
+
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 
 /**
@@ -70,33 +76,37 @@ public abstract class CachingBlockManager extends BlockManager {
   // Once set to true, any further caching requests will be ignored.
   private final AtomicBoolean cachingDisabled;
 
+  private final PrefetchingStatistics prefetchingStatistics;
+
   /**
    * Constructs an instance of a {@code CachingBlockManager}.
    *
    * @param futurePool asynchronous tasks are performed in this pool.
    * @param blockData information about each block of the underlying file.
    * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
+   * @param prefetchingStatistics statistics for this stream.
    *
-   * @throws IllegalArgumentException if futurePool is null.
    * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
    */
   public CachingBlockManager(
       ExecutorServiceFuturePool futurePool,
       BlockData blockData,
-      int bufferPoolSize) {
+      int bufferPoolSize,
+      PrefetchingStatistics prefetchingStatistics) {
     super(blockData);
 
-    Validate.checkNotNull(futurePool, "futurePool");
     Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
 
-    this.futurePool = futurePool;
+    this.futurePool = requireNonNull(futurePool);
     this.bufferPoolSize = bufferPoolSize;
     this.numCachingErrors = new AtomicInteger();
     this.numReadErrors = new AtomicInteger();
     this.cachingDisabled = new AtomicBoolean();
+    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
 
     if (this.getBlockData().getFileSize() > 0) {
-      this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize());
+      this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
+          this.prefetchingStatistics);
       this.cache = this.createCache();
     }
 
@@ -249,7 +259,7 @@ public abstract class CachingBlockManager extends BlockManager {
       }
 
       BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber);
-      PrefetchTask prefetchTask = new PrefetchTask(data, this);
+      PrefetchTask prefetchTask = new PrefetchTask(data, this, Instant.now());
       Future<Void> prefetchFuture = this.futurePool.executeFunction(prefetchTask);
       data.setPrefetch(prefetchFuture);
       this.ops.end(op);
@@ -279,8 +289,10 @@ public abstract class CachingBlockManager extends BlockManager {
     }
   }
 
-  private void prefetch(BufferData data) throws IOException {
+  private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOException {
     synchronized (data) {
+      prefetchingStatistics.executorAcquired(
+          Duration.between(taskQueuedStartTime, Instant.now()));
       this.readBlock(
           data,
           true,
@@ -297,6 +309,7 @@ public abstract class CachingBlockManager extends BlockManager {
     }
 
     BlockOperations.Operation op = null;
+    DurationTracker tracker = null;
 
     synchronized (data) {
       try {
@@ -318,6 +331,7 @@ public abstract class CachingBlockManager extends BlockManager {
         }
 
         if (isPrefetch) {
+          tracker = prefetchingStatistics.prefetchOperationStarted();
           op = this.ops.prefetch(data.getBlockNumber());
         } else {
           op = this.ops.getRead(data.getBlockNumber());
@@ -333,6 +347,11 @@ public abstract class CachingBlockManager extends BlockManager {
       } catch (Exception e) {
         String message = String.format("error during readBlock(%s)", data.getBlockNumber());
         LOG.error(message, e);
+
+        if (isPrefetch && tracker != null) {
+          tracker.failed();
+        }
+
         this.numReadErrors.incrementAndGet();
         data.setDone();
         throw e;
@@ -340,6 +359,13 @@ public abstract class CachingBlockManager extends BlockManager {
         if (op != null) {
           this.ops.end(op);
         }
+
+        if (isPrefetch) {
+          prefetchingStatistics.prefetchOperationCompleted();
+          if (tracker != null) {
+            tracker.close();
+          }
+        }
       }
     }
   }
@@ -350,16 +376,18 @@ public abstract class CachingBlockManager extends BlockManager {
   private static class PrefetchTask implements Supplier<Void> {
     private final BufferData data;
     private final CachingBlockManager blockManager;
+    private final Instant taskQueuedStartTime;
 
-    PrefetchTask(BufferData data, CachingBlockManager blockManager) {
+    PrefetchTask(BufferData data, CachingBlockManager blockManager, Instant taskQueuedStartTime) {
       this.data = data;
       this.blockManager = blockManager;
+      this.taskQueuedStartTime = taskQueuedStartTime;
     }
 
     @Override
     public Void get() {
       try {
-        this.blockManager.prefetch(data);
+        this.blockManager.prefetch(data, taskQueuedStartTime);
       } catch (Exception e) {
         LOG.error("error during prefetch", e);
       }
@@ -420,14 +448,18 @@ public abstract class CachingBlockManager extends BlockManager {
         blockFuture = cf;
       }
 
-      CachePutTask task = new CachePutTask(data, blockFuture, this);
+      CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
       Future<Void> actionFuture = this.futurePool.executeFunction(task);
       data.setCaching(actionFuture);
       this.ops.end(op);
     }
   }
 
-  private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture) {
+  private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
+      Instant taskQueuedStartTime) {
+    prefetchingStatistics.executorAcquired(
+        Duration.between(taskQueuedStartTime, Instant.now()));
+
     if (this.closed) {
       return;
     }
@@ -493,7 +525,7 @@ public abstract class CachingBlockManager extends BlockManager {
   }
 
   protected BlockCache createCache() {
-    return new SingleFilePerBlockCache();
+    return new SingleFilePerBlockCache(prefetchingStatistics);
   }
 
   protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
@@ -513,18 +545,22 @@ public abstract class CachingBlockManager extends BlockManager {
     // Block manager that manages this block.
     private final CachingBlockManager blockManager;
 
+    private final Instant taskQueuedStartTime;
+
     CachePutTask(
         BufferData data,
         Future<Void> blockFuture,
-        CachingBlockManager blockManager) {
+        CachingBlockManager blockManager,
+        Instant taskQueuedStartTime) {
       this.data = data;
       this.blockFuture = blockFuture;
       this.blockManager = blockManager;
+      this.taskQueuedStartTime = taskQueuedStartTime;
     }
 
     @Override
     public Void get() {
-      this.blockManager.addToCacheAndRelease(this.data, this.blockFuture);
+      this.blockManager.addToCacheAndRelease(this.data, this.blockFuture, taskQueuedStartTime);
       return null;
     }
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java
new file mode 100644
index 00000000000..b1894f97696
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java
@@ -0,0 +1,67 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements. See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership. The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License. You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+
+package org.apache.hadoop.fs.common;
+
+import java.time.Duration;
+
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+public interface PrefetchingStatistics extends IOStatisticsSource {
+
+  /**
+   * A prefetch operation has started.
+   * @return duration tracker
+   */
+  DurationTracker prefetchOperationStarted();
+
+  /**
+   * A block has been saved to the file cache.
+   */
+  void blockAddedToFileCache();
+
+  /**
+   * A block has been removed from the file cache.
+   */
+  void blockRemovedFromFileCache();
+
+  /**
+   * A prefetch operation has completed.
+   */
+  void prefetchOperationCompleted();
+
+  /**
+   * An executor has been acquired, either for prefetching or caching.
+   * @param timeInQueue time taken to acquire an executor.
+   */
+  void executorAcquired(Duration timeInQueue);
+
+  /**
+   * A new buffer has been added to the buffer pool.
+   * @param size size of the new buffer
+   */
+  void memoryAllocated(int size);
+
+  /**
+   * Previously allocated memory has been freed.
+   * @param size size of memory freed.
+   */
+  void memoryFreed(int size);
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java
index 0f3d59b6cb9..7252c294bee 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java
@@ -42,6 +42,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Provides functionality necessary for caching blocks of data read from FileSystem.
  * Each cache block is stored on the local disk as a separate file.
@@ -58,6 +60,8 @@ public class SingleFilePerBlockCache implements BlockCache {
 
   private boolean closed;
 
+  private final PrefetchingStatistics prefetchingStatistics;
+
   // Cache entry.
   // Each block is stored as a separate file.
   private static class Entry {
@@ -81,7 +85,13 @@ public class SingleFilePerBlockCache implements BlockCache {
     }
   }
 
-  public SingleFilePerBlockCache() {
+  /**
+   * Constructs an instance of a {@code SingleFilePerBlockCache}.
+   *
+   * @param prefetchingStatistics statistics for this stream.
+   */
+  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
   }
 
   /**
@@ -184,6 +194,7 @@ public class SingleFilePerBlockCache implements BlockCache {
     }
 
     this.writeFile(blockFilePath, buffer);
+    this.prefetchingStatistics.blockAddedToFileCache();
     long checksum = BufferData.getChecksum(buffer);
     Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum);
     this.blocks.put(blockNumber, entry);
@@ -221,6 +232,7 @@ public class SingleFilePerBlockCache implements BlockCache {
     for (Entry entry : this.blocks.values()) {
       try {
         Files.deleteIfExists(entry.path);
+        this.prefetchingStatistics.blockRemovedFromFileCache();
         numFilesDeleted++;
       } catch (IOException e) {
         // Ignore while closing so that we can delete as many cache files as possible.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index eec63667201..22c28950067 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -837,11 +837,19 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
               StreamStatisticNames.STREAM_READ_UNBUFFERED,
               StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
-          .withGauges(STREAM_READ_GAUGE_INPUT_POLICY)
+          .withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
+              STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
+              STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
+              STREAM_READ_ACTIVE_MEMORY_IN_USE.getSymbol()
+              )
           .withDurationTracking(ACTION_HTTP_GET_REQUEST,
+              ACTION_EXECUTOR_ACQUIRED,
               StoreStatisticNames.ACTION_FILE_OPENED,
               StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
-              StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED)
+              StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
+              StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
+              StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ,
+              StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ)
           .build();
       setIOStatistics(st);
       aborted = st.getCounterReference(
@@ -902,6 +910,18 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
       return incCounter(name, value);
     }
 
+    /**
+     * Increment the Statistic gauge and the local IOStatistics
+     * equivalent.
+     * @param statistic statistic
+     * @param v value.
+     * @return local IOStatistic value
+     */
+    private long incAllGauges(Statistic statistic, long v) {
+      incrementGauge(statistic, v);
+      return incGauge(statistic.getSymbol(), v);
+    }
+
     /**
      * {@inheritDoc}.
      * Increments the number of seek operations,
@@ -1017,6 +1037,12 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
       }
     }
 
+    @Override
+    public void executorAcquired(Duration timeInQueue) {
+      // update the duration fields in the IOStatistics.
+      localIOStatistics().addTimedOperation(ACTION_EXECUTOR_ACQUIRED, timeInQueue);
+    }
+
     /**
      * {@code close()} merges the stream statistics into the filesystem's
      * instrumentation instance.
@@ -1281,6 +1307,37 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
           ? StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED
           : StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED);
     }
+
+    @Override
+    public DurationTracker prefetchOperationStarted() {
+      incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1);
+      return trackDuration(StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS);
+    }
+
+    @Override
+    public void blockAddedToFileCache() {
+      incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, 1);
+    }
+
+    @Override
+    public void blockRemovedFromFileCache() {
+      incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, -1);
+    }
+
+    @Override
+    public void prefetchOperationCompleted() {
+      incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
+    }
+
+    @Override
+    public void memoryAllocated(int size) {
+      incAllGauges(STREAM_READ_ACTIVE_MEMORY_IN_USE, size);
+    }
+
+    @Override
+    public void memoryFreed(int size) {
+      incAllGauges(STREAM_READ_ACTIVE_MEMORY_IN_USE, -size);
+    }
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 86cb18076cc..b154447c607 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -357,6 +357,18 @@ public enum Statistic {
       StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
       "Total count of bytes read from an input stream",
       TYPE_COUNTER),
+  STREAM_READ_BLOCKS_IN_FILE_CACHE(
+      StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,
+      "Gauge of blocks in disk cache",
+      TYPE_GAUGE),
+  STREAM_READ_ACTIVE_PREFETCH_OPERATIONS(
+      StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS,
+      "Gauge of active prefetches",
+      TYPE_GAUGE),
+  STREAM_READ_ACTIVE_MEMORY_IN_USE(
+      StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE,
+      "Gauge of active memory in use",
+      TYPE_GAUGE),
 
   /* Stream Write statistics */
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
index 674a5ccbdd8..1c058087f31 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.common.BlockData;
 import org.apache.hadoop.fs.common.CachingBlockManager;
 import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 
 /**
  * Provides access to S3 file one block at a time.
@@ -48,6 +49,7 @@ public class S3CachingBlockManager extends CachingBlockManager {
    * @param reader reader that reads from S3 file.
    * @param blockData information about each block of the S3 file.
    * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
+   * @param streamStatistics statistics for this stream.
    *
    * @throws IllegalArgumentException if reader is null.
    */
@@ -55,8 +57,9 @@ public class S3CachingBlockManager extends CachingBlockManager {
       ExecutorServiceFuturePool futurePool,
       S3Reader reader,
       BlockData blockData,
-      int bufferPoolSize) {
-    super(futurePool, blockData, bufferPoolSize);
+      int bufferPoolSize,
+      S3AInputStreamStatistics streamStatistics) {
+    super(futurePool, blockData, bufferPoolSize, streamStatistics);
 
     Validate.checkNotNull(reader, "reader");
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
index b6c6bf39988..b00119ac4e1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+
 /**
  * Provides an {@code InputStream} that allows reading from an S3 file.
  * Prefetched blocks are cached to local disk if a seek away from the
@@ -120,8 +123,10 @@ public class S3CachingInputStream extends S3InputStream {
 
   @Override
   public void close() throws IOException {
-    super.close();
+    // Close the BlockManager first, cancelling active prefetches,
+    // deleting cached files and freeing memory used by buffer pool.
     this.blockManager.close();
+    super.close();
     LOG.info("closed: {}", this.getName());
   }
 
@@ -171,7 +176,10 @@ public class S3CachingInputStream extends S3InputStream {
       }
     }
 
-    BufferData data = this.blockManager.get(toBlockNumber);
+    BufferData data = invokeTrackingDuration(
+        this.getS3AStreamStatistics().trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ),
+        () -> this.blockManager.get(toBlockNumber));
+
     this.getFilePosition().setData(data, startOffset, readPos);
     return true;
   }
@@ -193,6 +201,7 @@ public class S3CachingInputStream extends S3InputStream {
       S3Reader reader,
       BlockData blockData,
       int bufferPoolSize) {
-    return new S3CachingBlockManager(futurePool, reader, blockData, bufferPoolSize);
+    return new S3CachingBlockManager(futurePool, reader, blockData, bufferPoolSize,
+        this.getS3AStreamStatistics());
   }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
index 89e3618be53..19ab4f6961d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
@@ -31,6 +31,10 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.common.Validate;
 import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
 
 /**
  * Provides functionality to read S3 file one block at a time.
@@ -47,6 +51,8 @@ public class S3Reader implements Closeable {
   // Set to true by close().
   private volatile boolean closed;
 
+  private final S3AInputStreamStatistics streamStatistics;
+
   /**
    * Constructs an instance of {@link S3Reader}.
    *
@@ -58,6 +64,7 @@ public class S3Reader implements Closeable {
     Validate.checkNotNull(s3File, "s3File");
 
     this.s3File = s3File;
+    this.streamStatistics = this.s3File.getStatistics();
   }
 
   /**
@@ -95,26 +102,24 @@ public class S3Reader implements Closeable {
   private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
       throws IOException {
 
-    this.s3File.getStatistics().readOperationStarted(offset, size);
+    this.streamStatistics.readOperationStarted(offset, size);
     Invoker invoker = this.s3File.getReadInvoker();
 
-    int invokerResponse = invoker.retry(
-        "read", this.s3File.getPath(), true,
-        () -> {
+    int invokerResponse = invoker.retry("read", this.s3File.getPath(), true,
+        trackDurationOfOperation(streamStatistics, STREAM_READ_REMOTE_BLOCK_READ, () -> {
           try {
             this.readOneBlock(buffer, offset, size);
           } catch (EOFException e) {
             // the base implementation swallows EOFs.
             return -1;
           } catch (SocketTimeoutException e) {
-            this.s3File.getStatistics().readException();
             throw e;
           } catch (IOException e) {
             this.s3File.getStatistics().readException();
             throw e;
           }
           return 0;
-        });
+        }));
 
     int numBytesRead = buffer.position();
     buffer.limit(numBytesRead);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 539af2bde36..e74a6d59a86 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a.statistics;
 
+import org.apache.hadoop.fs.common.PrefetchingStatistics;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 
 /**
@@ -26,7 +27,7 @@ import org.apache.hadoop.fs.statistics.DurationTracker;
  * It also contains getters for tests.
  */
 public interface S3AInputStreamStatistics extends AutoCloseable,
-    S3AStatisticInterface {
+    S3AStatisticInterface, PrefetchingStatistics {
 
   /**
    * Seek backwards, incrementing the seek and backward seek counters.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index f618270798e..1f9d75a9f78 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -208,6 +208,41 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
 
     }
 
+    @Override
+    public DurationTracker prefetchOperationStarted() {
+      return stubDurationTracker();
+    }
+
+    @Override
+    public void prefetchOperationCompleted() {
+
+    }
+
+    @Override
+    public void blockAddedToFileCache() {
+
+    }
+
+    @Override
+    public void blockRemovedFromFileCache() {
+
+    }
+
+    @Override
+    public void executorAcquired(Duration timeInQueue) {
+
+    }
+
+    @Override
+    public void memoryAllocated(int size) {
+
+    }
+
+    @Override
+    public void memoryFreed(int size) {
+
+    }
+
     /**
      * Return an IO statistics instance.
      * @return an empty IO statistics instance.
@@ -341,6 +376,7 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
     public DurationTracker initiateInnerStreamClose(final boolean abort) {
       return stubDurationTracker();
     }
+
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java
new file mode 100644
index 00000000000..ce13d2d9929
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java
@@ -0,0 +1,76 @@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements. See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership. The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License. You may obtain a copy of the License at
+  *
+  *   http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied. See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+
+package org.apache.hadoop.fs.common;
+
+import java.time.Duration;
+
+import org.apache.hadoop.fs.statistics.DurationTracker;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;
+
+public final class EmptyPrefetchingStatistics implements PrefetchingStatistics {
+
+  private static final EmptyPrefetchingStatistics EMPTY_PREFETCHING_STATISTICS =
+      new EmptyPrefetchingStatistics();
+
+  private EmptyPrefetchingStatistics() {
+  }
+
+  public static EmptyPrefetchingStatistics getInstance() {
+    return EMPTY_PREFETCHING_STATISTICS;
+  }
+
+  @Override
+  public DurationTracker prefetchOperationStarted() {
+    return stubDurationTracker();
+  }
+
+  @Override
+  public void blockAddedToFileCache() {
+
+  }
+
+  @Override
+  public void blockRemovedFromFileCache() {
+
+  }
+
+  @Override
+  public void prefetchOperationCompleted() {
+
+  }
+
+  @Override
+  public void executorAcquired(Duration timeInQueue) {
+
+  }
+
+  @Override
+  public void memoryAllocated(int size) {
+
+  }
+
+  @Override
+  public void memoryFreed(int size) {
+
+  }
+
+}
+
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java
index b1344c6972c..c402673a49d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
+import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 import static org.junit.Assert.assertEquals;
@@ -38,7 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase {
   @Test
   public void testArgChecks() throws Exception {
     // Should not throw.
-    BlockCache cache = new SingleFilePerBlockCache();
+    BlockCache cache =
+        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
 
     ByteBuffer buffer = ByteBuffer.allocate(16);
 
@@ -47,12 +49,17 @@ public class TestBlockCache extends AbstractHadoopTestBase {
         IllegalArgumentException.class,
         "'buffer' must not be null",
         () -> cache.put(42, null));
+
+    ExceptionAsserts.assertThrows(
+        NullPointerException.class,
+        () -> new SingleFilePerBlockCache(null));
   }
 
 
   @Test
   public void testPutAndGet() throws Exception {
-    BlockCache cache = new SingleFilePerBlockCache();
+    BlockCache cache =
+        new SingleFilePerBlockCache(new EmptyS3AStatisticsContext().newInputStreamStatistics());
 
     ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
     for (byte i = 0; i < BUFFER_SIZE; i++) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java
index 43be295cb38..c9134f1e251 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.common;
 
 import org.junit.Test;
 
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 import static org.junit.Assert.assertEquals;
@@ -32,33 +34,38 @@ public class TestBufferPool extends AbstractHadoopTestBase {
 
   private static final int POOL_SIZE = 2;
   private static final int BUFFER_SIZE = 10;
-
+  private final S3AInputStreamStatistics s3AInputStreamStatistics =
+      new EmptyS3AStatisticsContext().newInputStreamStatistics();
 
   @Test
   public void testArgChecks() throws Exception {
     // Should not throw.
-    BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE);
+    BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics);
 
     // Verify it throws correctly.
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
         "'size' must be a positive integer",
-        () -> new BufferPool(0, 10));
+        () -> new BufferPool(0, 10, s3AInputStreamStatistics));
 
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
         "'size' must be a positive integer",
-        () -> new BufferPool(-1, 10));
+        () -> new BufferPool(-1, 10, s3AInputStreamStatistics));
 
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
         "'bufferSize' must be a positive integer",
-        () -> new BufferPool(10, 0));
+        () -> new BufferPool(10, 0, s3AInputStreamStatistics));
 
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
         "'bufferSize' must be a positive integer",
-        () -> new BufferPool(1, -10));
+        () -> new BufferPool(1, -10, s3AInputStreamStatistics));
+
+    ExceptionAsserts.assertThrows(
+        NullPointerException.class,
+        () -> new BufferPool(1, 10, null));
 
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
@@ -78,7 +85,7 @@ public class TestBufferPool extends AbstractHadoopTestBase {
 
   @Test
   public void testGetAndRelease() {
-    BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE);
+    BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics);
     assertInitialState(pool, POOL_SIZE);
 
     int count = 0;
@@ -125,7 +132,7 @@ public class TestBufferPool extends AbstractHadoopTestBase {
   private void testReleaseHelper(BufferData.State stateBeforeRelease, boolean expectThrow)
       throws Exception {
 
-    BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE);
+    BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics);
     assertInitialState(pool, POOL_SIZE);
 
     BufferData data = this.acquire(pool, 1);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java
index 9b831cb3b84..f46e93e1084 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java
@@ -39,6 +39,7 @@ import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 
 /**
@@ -104,10 +105,11 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
   @Test
   public void testReadLargeFileFully() throws Throwable {
     describe("read a large file fully, uses S3CachingInputStream");
+    IOStatistics ioStats;
     openFS();
 
     try (FSDataInputStream in = largeFileFS.open(largeFile)) {
-      IOStatistics ioStats = in.getIOStatistics();
+      ioStats = in.getIOStatistics();
 
       byte[] buffer = new byte[S_1M * 10];
       long bytesRead = 0;
@@ -115,20 +117,29 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
       while (bytesRead < largeFileSize) {
         in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead));
         bytesRead += buffer.length;
+        // Blocks are fully read, no blocks should be cached
+        verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,
+            0);
       }
 
+      // Assert that first block is read synchronously, following blocks are prefetched
+      verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
+          numBlocks - 1);
       verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
       verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
     }
+    // Verify that once stream is closed, all memory is freed
+    verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
   }
 
   @Test
   public void testRandomReadLargeFile() throws Throwable {
     describe("random read on a large file, uses S3CachingInputStream");
+    IOStatistics ioStats;
     openFS();
 
     try (FSDataInputStream in = largeFileFS.open(largeFile)) {
-      IOStatistics ioStats = in.getIOStatistics();
+      ioStats = in.getIOStatistics();
 
       byte[] buffer = new byte[blockSize];
 
@@ -141,7 +152,13 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
 
       verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
       verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2);
+      verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 1);
+      // block 0 is cached when we seek to block 1, block 1 is cached as it is being prefetched
+      // when we seek out of block 0, see cancelPrefetches()
+      verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 2);
     }
+    verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
+    verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
   }
 
   @Test
@@ -163,6 +180,9 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
 
       verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
       verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1);
+      verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
+      // The buffer pool is not used
+      verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
index 7e91b6830d5..d2a045e335e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.fs.s3a.S3AInputPolicy;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
-import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
@@ -284,6 +283,7 @@ public final class Fakes {
     private final int writeDelay;
 
     public TestS3FilePerBlockCache(int readDelay, int writeDelay) {
+      super(new EmptyS3AStatisticsContext().newInputStreamStatistics());
       this.files = new ConcurrentHashMap<>();
       this.readDelay = readDelay;
       this.writeDelay = writeDelay;
@@ -337,7 +337,8 @@ public final class Fakes {
         S3Reader reader,
         BlockData blockData,
         int bufferPoolSize) {
-      super(futurePool, reader, blockData, bufferPoolSize);
+      super(futurePool, reader, blockData, bufferPoolSize,
+          new EmptyS3AStatisticsContext().newInputStreamStatistics());
     }
 
     @Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
index 3f84e2e0283..a9ebae276f3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.common.BlockData;
 import org.apache.hadoop.fs.common.BufferData;
 import org.apache.hadoop.fs.common.ExceptionAsserts;
 import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 import static org.junit.Assert.assertEquals;
@@ -41,6 +43,8 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
 
   private final ExecutorService threadPool = Executors.newFixedThreadPool(4);
   private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final S3AInputStreamStatistics streamStatistics =
+      new EmptyS3AStatisticsContext().newInputStreamStatistics();
 
   private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE);
 
@@ -51,33 +55,35 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
 
     // Should not throw.
     S3CachingBlockManager blockManager =
-        new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE);
+        new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
 
     // Verify it throws correctly.
     ExceptionAsserts.assertThrows(
-        IllegalArgumentException.class,
-        "'futurePool' must not be null",
-        () -> new S3CachingBlockManager(null, reader, blockData, POOL_SIZE));
+        NullPointerException.class,
+        () -> new S3CachingBlockManager(null, reader, blockData, POOL_SIZE, streamStatistics));
 
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
         "'reader' must not be null",
-        () -> new S3CachingBlockManager(futurePool, null, blockData, POOL_SIZE));
+        () -> new S3CachingBlockManager(futurePool, null, blockData, POOL_SIZE, streamStatistics));
 
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
         "'blockData' must not be null",
-        () -> new S3CachingBlockManager(futurePool, reader, null, POOL_SIZE));
+        () -> new S3CachingBlockManager(futurePool, reader, null, POOL_SIZE, streamStatistics));
 
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
         "'bufferPoolSize' must be a positive integer",
-        () -> new S3CachingBlockManager(futurePool, reader, blockData, 0));
+        () -> new S3CachingBlockManager(futurePool, reader, blockData, 0, streamStatistics));
 
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
         "'bufferPoolSize' must be a positive integer",
-        () -> new S3CachingBlockManager(futurePool, reader, blockData, -1));
+        () -> new S3CachingBlockManager(futurePool, reader, blockData, -1, streamStatistics));
+
+    ExceptionAsserts.assertThrows(NullPointerException.class,
+        () -> new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE, null));
 
     ExceptionAsserts.assertThrows(
         IllegalArgumentException.class,
@@ -108,8 +114,9 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
         ExecutorServiceFuturePool futurePool,
         S3Reader reader,
         BlockData blockData,
-        int bufferPoolSize) {
-      super(futurePool, reader, blockData, bufferPoolSize);
+        int bufferPoolSize,
+        S3AInputStreamStatistics streamStatistics) {
+      super(futurePool, reader, blockData, bufferPoolSize, streamStatistics);
     }
 
     // If true, forces the next read operation to fail.
@@ -157,7 +164,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
     MockS3File s3File = new MockS3File(FILE_SIZE, true);
     S3Reader reader = new S3Reader(s3File);
     TestBlockManager blockManager =
-        new TestBlockManager(futurePool, reader, blockData, POOL_SIZE);
+        new TestBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
 
     for (int b = 0; b < blockData.getNumBlocks(); b++) {
       // We simulate caching failure for all even numbered blocks.
@@ -204,7 +211,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
     MockS3File s3File = new MockS3File(FILE_SIZE, false);
     S3Reader reader = new S3Reader(s3File);
     TestBlockManager blockManager =
-        new TestBlockManager(futurePool, reader, blockData, POOL_SIZE);
+        new TestBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
     assertInitialState(blockManager);
 
     int expectedNumErrors = 0;
@@ -236,7 +243,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
     MockS3File s3File = new MockS3File(FILE_SIZE, false);
     S3Reader reader = new S3Reader(s3File);
     S3CachingBlockManager blockManager =
-        new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE);
+        new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
     assertInitialState(blockManager);
 
     for (int b = 0; b < blockData.getNumBlocks(); b++) {
@@ -267,7 +274,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
     MockS3File s3File = new MockS3File(FILE_SIZE, false);
     S3Reader reader = new S3Reader(s3File);
     TestBlockManager blockManager =
-        new TestBlockManager(futurePool, reader, blockData, POOL_SIZE);
+        new TestBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
     assertInitialState(blockManager);
 
     int expectedNumErrors = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org