You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/07/26 11:04:50 UTC
[hadoop] branch feature-HADOOP-18028-s3a-prefetch updated: HADOOP-18190. Collect IOStatistics during S3A prefetching (#4458)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch feature-HADOOP-18028-s3a-prefetch
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/feature-HADOOP-18028-s3a-prefetch by this push:
new a964fbf42f0 HADOOP-18190. Collect IOStatistics during S3A prefetching (#4458)
a964fbf42f0 is described below
commit a964fbf42f056ccf5a22c9cb0b07f241a3854f92
Author: ahmarsuhail <ah...@gmail.com>
AuthorDate: Tue Jul 26 12:04:30 2022 +0100
HADOOP-18190. Collect IOStatistics during S3A prefetching (#4458)
This adds iOStatisticsConnection to the S3PrefetchingInputStream class, with
new statistic names in StreamStatistics.
This stream is not (yet) IOStatisticsContext aware.
Contributed by Ahmar Suhail
---
.../hadoop/fs/statistics/StreamStatisticNames.java | 40 ++++++++++++
.../org/apache/hadoop/fs/common/BufferPool.java | 24 +++++--
.../hadoop/fs/common/CachingBlockManager.java | 64 ++++++++++++++----
.../hadoop/fs/common/PrefetchingStatistics.java | 67 +++++++++++++++++++
.../hadoop/fs/common/SingleFilePerBlockCache.java | 14 +++-
.../apache/hadoop/fs/s3a/S3AInstrumentation.java | 61 ++++++++++++++++-
.../java/org/apache/hadoop/fs/s3a/Statistic.java | 12 ++++
.../hadoop/fs/s3a/read/S3CachingBlockManager.java | 7 +-
.../hadoop/fs/s3a/read/S3CachingInputStream.java | 15 ++++-
.../org/apache/hadoop/fs/s3a/read/S3Reader.java | 17 +++--
.../s3a/statistics/S3AInputStreamStatistics.java | 3 +-
.../statistics/impl/EmptyS3AStatisticsContext.java | 36 ++++++++++
.../fs/common/EmptyPrefetchingStatistics.java | 76 ++++++++++++++++++++++
.../apache/hadoop/fs/common/TestBlockCache.java | 11 +++-
.../apache/hadoop/fs/common/TestBufferPool.java | 23 ++++---
.../fs/s3a/ITestS3PrefetchingInputStream.java | 24 ++++++-
.../java/org/apache/hadoop/fs/s3a/read/Fakes.java | 5 +-
.../fs/s3a/read/TestS3CachingBlockManager.java | 35 ++++++----
18 files changed, 471 insertions(+), 63 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index ca755f08419..5a55ca522d6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -387,6 +387,46 @@ public final class StreamStatisticNames {
public static final String BLOCKS_RELEASED
= "blocks_released";
+ /**
+ * Total number of prefetching operations executed.
+ */
+ public static final String STREAM_READ_PREFETCH_OPERATIONS
+ = "stream_read_prefetch_operations";
+
+ /**
+ * Total number of block in disk cache.
+ */
+ public static final String STREAM_READ_BLOCKS_IN_FILE_CACHE
+ = "stream_read_blocks_in_cache";
+
+ /**
+ * Total number of active prefetch operations.
+ */
+ public static final String STREAM_READ_ACTIVE_PREFETCH_OPERATIONS
+ = "stream_read_active_prefetch_operations";
+
+ /**
+ * Total bytes of memory in use by this input stream.
+ */
+ public static final String STREAM_READ_ACTIVE_MEMORY_IN_USE
+ = "stream_read_active_memory_in_use";
+
+ /**
+ * count/duration of reading a remote block.
+ *
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_REMOTE_BLOCK_READ
+ = "stream_read_block_read";
+
+ /**
+ * count/duration of acquiring a buffer and reading to it.
+ *
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_BLOCK_ACQUIRE_AND_READ
+ = "stream_read_block_acquire_read";
+
private StreamStatisticNames() {
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
index b151ed439af..bd7da11ddd8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
@@ -31,6 +31,8 @@ import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.Objects.requireNonNull;
+
/**
* Manages a fixed pool of {@code ByteBuffer} instances.
*
@@ -56,26 +58,32 @@ public class BufferPool implements Closeable {
// Allows associating metadata to each buffer in the pool.
private Map<BufferData, ByteBuffer> allocated;
+ private PrefetchingStatistics prefetchingStatistics;
+
/**
* Initializes a new instance of the {@code BufferPool} class.
*
* @param size number of buffer in this pool.
* @param bufferSize size in bytes of each buffer.
+ * @param prefetchingStatistics statistics for this stream.
*
* @throws IllegalArgumentException if size is zero or negative.
* @throws IllegalArgumentException if bufferSize is zero or negative.
*/
- public BufferPool(int size, int bufferSize) {
+ public BufferPool(int size, int bufferSize, PrefetchingStatistics prefetchingStatistics) {
Validate.checkPositiveInteger(size, "size");
Validate.checkPositiveInteger(bufferSize, "bufferSize");
this.size = size;
this.bufferSize = bufferSize;
this.allocated = new IdentityHashMap<BufferData, ByteBuffer>();
+ this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
this.pool = new BoundedResourcePool<ByteBuffer>(size) {
@Override
public ByteBuffer createNew() {
- return ByteBuffer.allocate(bufferSize);
+ ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+ prefetchingStatistics.memoryAllocated(bufferSize);
+ return buffer;
}
};
}
@@ -236,11 +244,15 @@ public class BufferPool implements Closeable {
}
}
- this.pool.close();
- this.pool = null;
+ int currentPoolSize = pool.numCreated();
+
+ pool.close();
+ pool = null;
+
+ allocated.clear();
+ allocated = null;
- this.allocated.clear();
- this.allocated = null;
+ prefetchingStatistics.memoryFreed(currentPoolSize * bufferSize);
}
// For debugging purposes.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
index 1bb439a9997..1207d3d0318 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.common;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -31,6 +33,10 @@ import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+
+import static java.util.Objects.requireNonNull;
+
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
@@ -70,33 +76,37 @@ public abstract class CachingBlockManager extends BlockManager {
// Once set to true, any further caching requests will be ignored.
private final AtomicBoolean cachingDisabled;
+ private final PrefetchingStatistics prefetchingStatistics;
+
/**
* Constructs an instance of a {@code CachingBlockManager}.
*
* @param futurePool asynchronous tasks are performed in this pool.
* @param blockData information about each block of the underlying file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
+ * @param prefetchingStatistics statistics for this stream.
*
- * @throws IllegalArgumentException if futurePool is null.
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
*/
public CachingBlockManager(
ExecutorServiceFuturePool futurePool,
BlockData blockData,
- int bufferPoolSize) {
+ int bufferPoolSize,
+ PrefetchingStatistics prefetchingStatistics) {
super(blockData);
- Validate.checkNotNull(futurePool, "futurePool");
Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
- this.futurePool = futurePool;
+ this.futurePool = requireNonNull(futurePool);
this.bufferPoolSize = bufferPoolSize;
this.numCachingErrors = new AtomicInteger();
this.numReadErrors = new AtomicInteger();
this.cachingDisabled = new AtomicBoolean();
+ this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
if (this.getBlockData().getFileSize() > 0) {
- this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize());
+ this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
+ this.prefetchingStatistics);
this.cache = this.createCache();
}
@@ -249,7 +259,7 @@ public abstract class CachingBlockManager extends BlockManager {
}
BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber);
- PrefetchTask prefetchTask = new PrefetchTask(data, this);
+ PrefetchTask prefetchTask = new PrefetchTask(data, this, Instant.now());
Future<Void> prefetchFuture = this.futurePool.executeFunction(prefetchTask);
data.setPrefetch(prefetchFuture);
this.ops.end(op);
@@ -279,8 +289,10 @@ public abstract class CachingBlockManager extends BlockManager {
}
}
- private void prefetch(BufferData data) throws IOException {
+ private void prefetch(BufferData data, Instant taskQueuedStartTime) throws IOException {
synchronized (data) {
+ prefetchingStatistics.executorAcquired(
+ Duration.between(taskQueuedStartTime, Instant.now()));
this.readBlock(
data,
true,
@@ -297,6 +309,7 @@ public abstract class CachingBlockManager extends BlockManager {
}
BlockOperations.Operation op = null;
+ DurationTracker tracker = null;
synchronized (data) {
try {
@@ -318,6 +331,7 @@ public abstract class CachingBlockManager extends BlockManager {
}
if (isPrefetch) {
+ tracker = prefetchingStatistics.prefetchOperationStarted();
op = this.ops.prefetch(data.getBlockNumber());
} else {
op = this.ops.getRead(data.getBlockNumber());
@@ -333,6 +347,11 @@ public abstract class CachingBlockManager extends BlockManager {
} catch (Exception e) {
String message = String.format("error during readBlock(%s)", data.getBlockNumber());
LOG.error(message, e);
+
+ if (isPrefetch && tracker != null) {
+ tracker.failed();
+ }
+
this.numReadErrors.incrementAndGet();
data.setDone();
throw e;
@@ -340,6 +359,13 @@ public abstract class CachingBlockManager extends BlockManager {
if (op != null) {
this.ops.end(op);
}
+
+ if (isPrefetch) {
+ prefetchingStatistics.prefetchOperationCompleted();
+ if (tracker != null) {
+ tracker.close();
+ }
+ }
}
}
}
@@ -350,16 +376,18 @@ public abstract class CachingBlockManager extends BlockManager {
private static class PrefetchTask implements Supplier<Void> {
private final BufferData data;
private final CachingBlockManager blockManager;
+ private final Instant taskQueuedStartTime;
- PrefetchTask(BufferData data, CachingBlockManager blockManager) {
+ PrefetchTask(BufferData data, CachingBlockManager blockManager, Instant taskQueuedStartTime) {
this.data = data;
this.blockManager = blockManager;
+ this.taskQueuedStartTime = taskQueuedStartTime;
}
@Override
public Void get() {
try {
- this.blockManager.prefetch(data);
+ this.blockManager.prefetch(data, taskQueuedStartTime);
} catch (Exception e) {
LOG.error("error during prefetch", e);
}
@@ -420,14 +448,18 @@ public abstract class CachingBlockManager extends BlockManager {
blockFuture = cf;
}
- CachePutTask task = new CachePutTask(data, blockFuture, this);
+ CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
Future<Void> actionFuture = this.futurePool.executeFunction(task);
data.setCaching(actionFuture);
this.ops.end(op);
}
}
- private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture) {
+ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
+ Instant taskQueuedStartTime) {
+ prefetchingStatistics.executorAcquired(
+ Duration.between(taskQueuedStartTime, Instant.now()));
+
if (this.closed) {
return;
}
@@ -493,7 +525,7 @@ public abstract class CachingBlockManager extends BlockManager {
}
protected BlockCache createCache() {
- return new SingleFilePerBlockCache();
+ return new SingleFilePerBlockCache(prefetchingStatistics);
}
protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
@@ -513,18 +545,22 @@ public abstract class CachingBlockManager extends BlockManager {
// Block manager that manages this block.
private final CachingBlockManager blockManager;
+ private final Instant taskQueuedStartTime;
+
CachePutTask(
BufferData data,
Future<Void> blockFuture,
- CachingBlockManager blockManager) {
+ CachingBlockManager blockManager,
+ Instant taskQueuedStartTime) {
this.data = data;
this.blockFuture = blockFuture;
this.blockManager = blockManager;
+ this.taskQueuedStartTime = taskQueuedStartTime;
}
@Override
public Void get() {
- this.blockManager.addToCacheAndRelease(this.data, this.blockFuture);
+ this.blockManager.addToCacheAndRelease(this.data, this.blockFuture, taskQueuedStartTime);
return null;
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java
new file mode 100644
index 00000000000..b1894f97696
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java
@@ -0,0 +1,67 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.time.Duration;
+
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+
+public interface PrefetchingStatistics extends IOStatisticsSource {
+
+ /**
+ * A prefetch operation has started.
+ * @return duration tracker
+ */
+ DurationTracker prefetchOperationStarted();
+
+ /**
+ * A block has been saved to the file cache.
+ */
+ void blockAddedToFileCache();
+
+ /**
+ * A block has been removed from the file cache.
+ */
+ void blockRemovedFromFileCache();
+
+ /**
+ * A prefetch operation has completed.
+ */
+ void prefetchOperationCompleted();
+
+ /**
+ * An executor has been acquired, either for prefetching or caching.
+ * @param timeInQueue time taken to acquire an executor.
+ */
+ void executorAcquired(Duration timeInQueue);
+
+ /**
+ * A new buffer has been added to the buffer pool.
+ * @param size size of the new buffer
+ */
+ void memoryAllocated(int size);
+
+ /**
+ * Previously allocated memory has been freed.
+ * @param size size of memory freed.
+ */
+ void memoryFreed(int size);
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java
index 0f3d59b6cb9..7252c294bee 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/SingleFilePerBlockCache.java
@@ -42,6 +42,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static java.util.Objects.requireNonNull;
+
/**
* Provides functionality necessary for caching blocks of data read from FileSystem.
* Each cache block is stored on the local disk as a separate file.
@@ -58,6 +60,8 @@ public class SingleFilePerBlockCache implements BlockCache {
private boolean closed;
+ private final PrefetchingStatistics prefetchingStatistics;
+
// Cache entry.
// Each block is stored as a separate file.
private static class Entry {
@@ -81,7 +85,13 @@ public class SingleFilePerBlockCache implements BlockCache {
}
}
- public SingleFilePerBlockCache() {
+ /**
+ * Constructs an instance of a {@code SingleFilePerBlockCache}.
+ *
+ * @param prefetchingStatistics statistics for this stream.
+ */
+ public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+ this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
}
/**
@@ -184,6 +194,7 @@ public class SingleFilePerBlockCache implements BlockCache {
}
this.writeFile(blockFilePath, buffer);
+ this.prefetchingStatistics.blockAddedToFileCache();
long checksum = BufferData.getChecksum(buffer);
Entry entry = new Entry(blockNumber, blockFilePath, buffer.limit(), checksum);
this.blocks.put(blockNumber, entry);
@@ -221,6 +232,7 @@ public class SingleFilePerBlockCache implements BlockCache {
for (Entry entry : this.blocks.values()) {
try {
Files.deleteIfExists(entry.path);
+ this.prefetchingStatistics.blockRemovedFromFileCache();
numFilesDeleted++;
} catch (IOException e) {
// Ignore while closing so that we can delete as many cache files as possible.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index eec63667201..22c28950067 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -837,11 +837,19 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
StreamStatisticNames.STREAM_READ_UNBUFFERED,
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
- .withGauges(STREAM_READ_GAUGE_INPUT_POLICY)
+ .withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
+ STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
+ STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
+ STREAM_READ_ACTIVE_MEMORY_IN_USE.getSymbol()
+ )
.withDurationTracking(ACTION_HTTP_GET_REQUEST,
+ ACTION_EXECUTOR_ACQUIRED,
StoreStatisticNames.ACTION_FILE_OPENED,
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
- StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED)
+ StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
+ StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
+ StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ,
+ StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ)
.build();
setIOStatistics(st);
aborted = st.getCounterReference(
@@ -902,6 +910,18 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
return incCounter(name, value);
}
+ /**
+ * Increment the Statistic gauge and the local IOStatistics
+ * equivalent.
+ * @param statistic statistic
+ * @param v value.
+ * @return local IOStatistic value
+ */
+ private long incAllGauges(Statistic statistic, long v) {
+ incrementGauge(statistic, v);
+ return incGauge(statistic.getSymbol(), v);
+ }
+
/**
* {@inheritDoc}.
* Increments the number of seek operations,
@@ -1017,6 +1037,12 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
}
}
+ @Override
+ public void executorAcquired(Duration timeInQueue) {
+ // update the duration fields in the IOStatistics.
+ localIOStatistics().addTimedOperation(ACTION_EXECUTOR_ACQUIRED, timeInQueue);
+ }
+
/**
* {@code close()} merges the stream statistics into the filesystem's
* instrumentation instance.
@@ -1281,6 +1307,37 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
? StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED
: StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED);
}
+
+ @Override
+ public DurationTracker prefetchOperationStarted() {
+ incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1);
+ return trackDuration(StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS);
+ }
+
+ @Override
+ public void blockAddedToFileCache() {
+ incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, 1);
+ }
+
+ @Override
+ public void blockRemovedFromFileCache() {
+ incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, -1);
+ }
+
+ @Override
+ public void prefetchOperationCompleted() {
+ incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);
+ }
+
+ @Override
+ public void memoryAllocated(int size) {
+ incAllGauges(STREAM_READ_ACTIVE_MEMORY_IN_USE, size);
+ }
+
+ @Override
+ public void memoryFreed(int size) {
+ incAllGauges(STREAM_READ_ACTIVE_MEMORY_IN_USE, -size);
+ }
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 86cb18076cc..b154447c607 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -357,6 +357,18 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
"Total count of bytes read from an input stream",
TYPE_COUNTER),
+ STREAM_READ_BLOCKS_IN_FILE_CACHE(
+ StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,
+ "Gauge of blocks in disk cache",
+ TYPE_GAUGE),
+ STREAM_READ_ACTIVE_PREFETCH_OPERATIONS(
+ StreamStatisticNames.STREAM_READ_ACTIVE_PREFETCH_OPERATIONS,
+ "Gauge of active prefetches",
+ TYPE_GAUGE),
+ STREAM_READ_ACTIVE_MEMORY_IN_USE(
+ StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE,
+ "Gauge of active memory in use",
+ TYPE_GAUGE),
/* Stream Write statistics */
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
index 674a5ccbdd8..1c058087f31 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.common.BlockData;
import org.apache.hadoop.fs.common.CachingBlockManager;
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
import org.apache.hadoop.fs.common.Validate;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
/**
* Provides access to S3 file one block at a time.
@@ -48,6 +49,7 @@ public class S3CachingBlockManager extends CachingBlockManager {
* @param reader reader that reads from S3 file.
* @param blockData information about each block of the S3 file.
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
+ * @param streamStatistics statistics for this stream.
*
* @throws IllegalArgumentException if reader is null.
*/
@@ -55,8 +57,9 @@ public class S3CachingBlockManager extends CachingBlockManager {
ExecutorServiceFuturePool futurePool,
S3Reader reader,
BlockData blockData,
- int bufferPoolSize) {
- super(futurePool, blockData, bufferPoolSize);
+ int bufferPoolSize,
+ S3AInputStreamStatistics streamStatistics) {
+ super(futurePool, blockData, bufferPoolSize, streamStatistics);
Validate.checkNotNull(reader, "reader");
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
index b6c6bf39988..b00119ac4e1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
@@ -33,6 +33,9 @@ import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+
/**
* Provides an {@code InputStream} that allows reading from an S3 file.
* Prefetched blocks are cached to local disk if a seek away from the
@@ -120,8 +123,10 @@ public class S3CachingInputStream extends S3InputStream {
@Override
public void close() throws IOException {
- super.close();
+ // Close the BlockManager first, cancelling active prefetches,
+ // deleting cached files and freeing memory used by buffer pool.
this.blockManager.close();
+ super.close();
LOG.info("closed: {}", this.getName());
}
@@ -171,7 +176,10 @@ public class S3CachingInputStream extends S3InputStream {
}
}
- BufferData data = this.blockManager.get(toBlockNumber);
+ BufferData data = invokeTrackingDuration(
+ this.getS3AStreamStatistics().trackDuration(STREAM_READ_BLOCK_ACQUIRE_AND_READ),
+ () -> this.blockManager.get(toBlockNumber));
+
this.getFilePosition().setData(data, startOffset, readPos);
return true;
}
@@ -193,6 +201,7 @@ public class S3CachingInputStream extends S3InputStream {
S3Reader reader,
BlockData blockData,
int bufferPoolSize) {
- return new S3CachingBlockManager(futurePool, reader, blockData, bufferPoolSize);
+ return new S3CachingBlockManager(futurePool, reader, blockData, bufferPoolSize,
+ this.getS3AStreamStatistics());
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
index 89e3618be53..19ab4f6961d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3Reader.java
@@ -31,6 +31,10 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.common.Validate;
import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
/**
* Provides functionality to read S3 file one block at a time.
@@ -47,6 +51,8 @@ public class S3Reader implements Closeable {
// Set to true by close().
private volatile boolean closed;
+ private final S3AInputStreamStatistics streamStatistics;
+
/**
* Constructs an instance of {@link S3Reader}.
*
@@ -58,6 +64,7 @@ public class S3Reader implements Closeable {
Validate.checkNotNull(s3File, "s3File");
this.s3File = s3File;
+ this.streamStatistics = this.s3File.getStatistics();
}
/**
@@ -95,26 +102,24 @@ public class S3Reader implements Closeable {
private int readOneBlockWithRetries(ByteBuffer buffer, long offset, int size)
throws IOException {
- this.s3File.getStatistics().readOperationStarted(offset, size);
+ this.streamStatistics.readOperationStarted(offset, size);
Invoker invoker = this.s3File.getReadInvoker();
- int invokerResponse = invoker.retry(
- "read", this.s3File.getPath(), true,
- () -> {
+ int invokerResponse = invoker.retry("read", this.s3File.getPath(), true,
+ trackDurationOfOperation(streamStatistics, STREAM_READ_REMOTE_BLOCK_READ, () -> {
try {
this.readOneBlock(buffer, offset, size);
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (SocketTimeoutException e) {
- this.s3File.getStatistics().readException();
throw e;
} catch (IOException e) {
this.s3File.getStatistics().readException();
throw e;
}
return 0;
- });
+ }));
int numBytesRead = buffer.position();
buffer.limit(numBytesRead);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 539af2bde36..e74a6d59a86 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.s3a.statistics;
+import org.apache.hadoop.fs.common.PrefetchingStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
/**
@@ -26,7 +27,7 @@ import org.apache.hadoop.fs.statistics.DurationTracker;
* It also contains getters for tests.
*/
public interface S3AInputStreamStatistics extends AutoCloseable,
- S3AStatisticInterface {
+ S3AStatisticInterface, PrefetchingStatistics {
/**
* Seek backwards, incrementing the seek and backward seek counters.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index f618270798e..1f9d75a9f78 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -208,6 +208,41 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
}
+ @Override
+ public DurationTracker prefetchOperationStarted() {
+ return stubDurationTracker();
+ }
+
+ @Override
+ public void prefetchOperationCompleted() {
+
+ }
+
+ @Override
+ public void blockAddedToFileCache() {
+
+ }
+
+ @Override
+ public void blockRemovedFromFileCache() {
+
+ }
+
+ @Override
+ public void executorAcquired(Duration timeInQueue) {
+
+ }
+
+ @Override
+ public void memoryAllocated(int size) {
+
+ }
+
+ @Override
+ public void memoryFreed(int size) {
+
+ }
+
/**
* Return an IO statistics instance.
* @return an empty IO statistics instance.
@@ -341,6 +376,7 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
public DurationTracker initiateInnerStreamClose(final boolean abort) {
return stubDurationTracker();
}
+
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java
new file mode 100644
index 00000000000..ce13d2d9929
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java
@@ -0,0 +1,76 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.time.Duration;
+
+import org.apache.hadoop.fs.statistics.DurationTracker;
+
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;
+
+public final class EmptyPrefetchingStatistics implements PrefetchingStatistics {
+
+ private static final EmptyPrefetchingStatistics EMPTY_PREFETCHING_STATISTICS =
+ new EmptyPrefetchingStatistics();
+
+ private EmptyPrefetchingStatistics() {
+ }
+
+ public static EmptyPrefetchingStatistics getInstance() {
+ return EMPTY_PREFETCHING_STATISTICS;
+ }
+
+ @Override
+ public DurationTracker prefetchOperationStarted() {
+ return stubDurationTracker();
+ }
+
+ @Override
+ public void blockAddedToFileCache() {
+
+ }
+
+ @Override
+ public void blockRemovedFromFileCache() {
+
+ }
+
+ @Override
+ public void prefetchOperationCompleted() {
+
+ }
+
+ @Override
+ public void executorAcquired(Duration timeInQueue) {
+
+ }
+
+ @Override
+ public void memoryAllocated(int size) {
+
+ }
+
+ @Override
+ public void memoryFreed(int size) {
+
+ }
+
+}
+
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java
index b1344c6972c..c402673a49d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import org.junit.Test;
+import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.junit.Assert.assertEquals;
@@ -38,7 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase {
@Test
public void testArgChecks() throws Exception {
// Should not throw.
- BlockCache cache = new SingleFilePerBlockCache();
+ BlockCache cache =
+ new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
ByteBuffer buffer = ByteBuffer.allocate(16);
@@ -47,12 +49,17 @@ public class TestBlockCache extends AbstractHadoopTestBase {
IllegalArgumentException.class,
"'buffer' must not be null",
() -> cache.put(42, null));
+
+ ExceptionAsserts.assertThrows(
+ NullPointerException.class,
+ () -> new SingleFilePerBlockCache(null));
}
@Test
public void testPutAndGet() throws Exception {
- BlockCache cache = new SingleFilePerBlockCache();
+ BlockCache cache =
+ new SingleFilePerBlockCache(new EmptyS3AStatisticsContext().newInputStreamStatistics());
ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
for (byte i = 0; i < BUFFER_SIZE; i++) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java
index 43be295cb38..c9134f1e251 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.common;
import org.junit.Test;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.junit.Assert.assertEquals;
@@ -32,33 +34,38 @@ public class TestBufferPool extends AbstractHadoopTestBase {
private static final int POOL_SIZE = 2;
private static final int BUFFER_SIZE = 10;
-
+ private final S3AInputStreamStatistics s3AInputStreamStatistics =
+ new EmptyS3AStatisticsContext().newInputStreamStatistics();
@Test
public void testArgChecks() throws Exception {
// Should not throw.
- BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE);
+ BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics);
// Verify it throws correctly.
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
"'size' must be a positive integer",
- () -> new BufferPool(0, 10));
+ () -> new BufferPool(0, 10, s3AInputStreamStatistics));
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
"'size' must be a positive integer",
- () -> new BufferPool(-1, 10));
+ () -> new BufferPool(-1, 10, s3AInputStreamStatistics));
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
"'bufferSize' must be a positive integer",
- () -> new BufferPool(10, 0));
+ () -> new BufferPool(10, 0, s3AInputStreamStatistics));
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
"'bufferSize' must be a positive integer",
- () -> new BufferPool(1, -10));
+ () -> new BufferPool(1, -10, s3AInputStreamStatistics));
+
+ ExceptionAsserts.assertThrows(
+ NullPointerException.class,
+ () -> new BufferPool(1, 10, null));
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
@@ -78,7 +85,7 @@ public class TestBufferPool extends AbstractHadoopTestBase {
@Test
public void testGetAndRelease() {
- BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE);
+ BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics);
assertInitialState(pool, POOL_SIZE);
int count = 0;
@@ -125,7 +132,7 @@ public class TestBufferPool extends AbstractHadoopTestBase {
private void testReleaseHelper(BufferData.State stateBeforeRelease, boolean expectThrow)
throws Exception {
- BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE);
+ BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics);
assertInitialState(pool, POOL_SIZE);
BufferData data = this.acquire(pool, 1);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java
index 9b831cb3b84..f46e93e1084 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3PrefetchingInputStream.java
@@ -39,6 +39,7 @@ import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
/**
@@ -104,10 +105,11 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
@Test
public void testReadLargeFileFully() throws Throwable {
describe("read a large file fully, uses S3CachingInputStream");
+ IOStatistics ioStats;
openFS();
try (FSDataInputStream in = largeFileFS.open(largeFile)) {
- IOStatistics ioStats = in.getIOStatistics();
+ ioStats = in.getIOStatistics();
byte[] buffer = new byte[S_1M * 10];
long bytesRead = 0;
@@ -115,20 +117,29 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
while (bytesRead < largeFileSize) {
in.readFully(buffer, 0, (int) Math.min(buffer.length, largeFileSize - bytesRead));
bytesRead += buffer.length;
+ // Blocks are fully read, no blocks should be cached
+ verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,
+ 0);
}
+ // Assert that first block is read synchronously, following blocks are prefetched
+ verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
+ numBlocks - 1);
verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, numBlocks);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, numBlocks);
}
+ // Verify that once stream is closed, all memory is freed
+ verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
}
@Test
public void testRandomReadLargeFile() throws Throwable {
describe("random read on a large file, uses S3CachingInputStream");
+ IOStatistics ioStats;
openFS();
try (FSDataInputStream in = largeFileFS.open(largeFile)) {
- IOStatistics ioStats = in.getIOStatistics();
+ ioStats = in.getIOStatistics();
byte[] buffer = new byte[blockSize];
@@ -141,7 +152,13 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 2);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 2);
+ verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 1);
+ // block 0 is cached when we seek to block 1, block 1 is cached as it is being prefetched
+ // when we seek out of block 0, see cancelPrefetches()
+ verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 2);
}
+ verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
+ verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
}
@Test
@@ -163,6 +180,9 @@ public class ITestS3PrefetchingInputStream extends AbstractS3ACostTest {
verifyStatisticCounterValue(ioStats, StoreStatisticNames.ACTION_HTTP_GET_REQUEST, 1);
verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_OPENED, 1);
+ verifyStatisticCounterValue(ioStats, StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS, 0);
+ // The buffer pool is not used
+ verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE, 0);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
index 7e91b6830d5..d2a045e335e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
@@ -49,7 +49,6 @@ import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
-import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
@@ -284,6 +283,7 @@ public final class Fakes {
private final int writeDelay;
public TestS3FilePerBlockCache(int readDelay, int writeDelay) {
+ super(new EmptyS3AStatisticsContext().newInputStreamStatistics());
this.files = new ConcurrentHashMap<>();
this.readDelay = readDelay;
this.writeDelay = writeDelay;
@@ -337,7 +337,8 @@ public final class Fakes {
S3Reader reader,
BlockData blockData,
int bufferPoolSize) {
- super(futurePool, reader, blockData, bufferPoolSize);
+ super(futurePool, reader, blockData, bufferPoolSize,
+ new EmptyS3AStatisticsContext().newInputStreamStatistics());
}
@Override
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
index 3f84e2e0283..a9ebae276f3 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.common.BlockData;
import org.apache.hadoop.fs.common.BufferData;
import org.apache.hadoop.fs.common.ExceptionAsserts;
import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
import org.apache.hadoop.test.AbstractHadoopTestBase;
import static org.junit.Assert.assertEquals;
@@ -41,6 +43,8 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
private final ExecutorService threadPool = Executors.newFixedThreadPool(4);
private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+ private final S3AInputStreamStatistics streamStatistics =
+ new EmptyS3AStatisticsContext().newInputStreamStatistics();
private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE);
@@ -51,33 +55,35 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
// Should not throw.
S3CachingBlockManager blockManager =
- new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE);
+ new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
// Verify it throws correctly.
ExceptionAsserts.assertThrows(
- IllegalArgumentException.class,
- "'futurePool' must not be null",
- () -> new S3CachingBlockManager(null, reader, blockData, POOL_SIZE));
+ NullPointerException.class,
+ () -> new S3CachingBlockManager(null, reader, blockData, POOL_SIZE, streamStatistics));
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
"'reader' must not be null",
- () -> new S3CachingBlockManager(futurePool, null, blockData, POOL_SIZE));
+ () -> new S3CachingBlockManager(futurePool, null, blockData, POOL_SIZE, streamStatistics));
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
"'blockData' must not be null",
- () -> new S3CachingBlockManager(futurePool, reader, null, POOL_SIZE));
+ () -> new S3CachingBlockManager(futurePool, reader, null, POOL_SIZE, streamStatistics));
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
"'bufferPoolSize' must be a positive integer",
- () -> new S3CachingBlockManager(futurePool, reader, blockData, 0));
+ () -> new S3CachingBlockManager(futurePool, reader, blockData, 0, streamStatistics));
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
"'bufferPoolSize' must be a positive integer",
- () -> new S3CachingBlockManager(futurePool, reader, blockData, -1));
+ () -> new S3CachingBlockManager(futurePool, reader, blockData, -1, streamStatistics));
+
+ ExceptionAsserts.assertThrows(NullPointerException.class,
+ () -> new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE, null));
ExceptionAsserts.assertThrows(
IllegalArgumentException.class,
@@ -108,8 +114,9 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
ExecutorServiceFuturePool futurePool,
S3Reader reader,
BlockData blockData,
- int bufferPoolSize) {
- super(futurePool, reader, blockData, bufferPoolSize);
+ int bufferPoolSize,
+ S3AInputStreamStatistics streamStatistics) {
+ super(futurePool, reader, blockData, bufferPoolSize, streamStatistics);
}
// If true, forces the next read operation to fail.
@@ -157,7 +164,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
MockS3File s3File = new MockS3File(FILE_SIZE, true);
S3Reader reader = new S3Reader(s3File);
TestBlockManager blockManager =
- new TestBlockManager(futurePool, reader, blockData, POOL_SIZE);
+ new TestBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
for (int b = 0; b < blockData.getNumBlocks(); b++) {
// We simulate caching failure for all even numbered blocks.
@@ -204,7 +211,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
MockS3File s3File = new MockS3File(FILE_SIZE, false);
S3Reader reader = new S3Reader(s3File);
TestBlockManager blockManager =
- new TestBlockManager(futurePool, reader, blockData, POOL_SIZE);
+ new TestBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
assertInitialState(blockManager);
int expectedNumErrors = 0;
@@ -236,7 +243,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
MockS3File s3File = new MockS3File(FILE_SIZE, false);
S3Reader reader = new S3Reader(s3File);
S3CachingBlockManager blockManager =
- new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE);
+ new S3CachingBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
assertInitialState(blockManager);
for (int b = 0; b < blockData.getNumBlocks(); b++) {
@@ -267,7 +274,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
MockS3File s3File = new MockS3File(FILE_SIZE, false);
S3Reader reader = new S3Reader(s3File);
TestBlockManager blockManager =
- new TestBlockManager(futurePool, reader, blockData, POOL_SIZE);
+ new TestBlockManager(futurePool, reader, blockData, POOL_SIZE, streamStatistics);
assertInitialState(blockManager);
int expectedNumErrors = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org