You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/10/19 13:38:26 UTC
[hadoop] branch trunk updated: HADOOP-18189 S3APrefetchingInputStream to support status probes when closed (#5036)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8aa04b0b24b HADOOP-18189 S3APrefetchingInputStream to support status probes when closed (#5036)
8aa04b0b24b is described below
commit 8aa04b0b24b1481b2b9785bd2e5e7165a866a9b0
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed Oct 19 06:38:11 2022 -0700
HADOOP-18189 S3APrefetchingInputStream to support status probes when closed (#5036)
Contributed by Viraj Jasani
---
.../org/apache/hadoop/fs/s3a/S3AInputStream.java | 1 +
.../fs/s3a/prefetch/S3APrefetchingInputStream.java | 40 ++++++++++++----
.../fs/s3a/ITestS3APrefetchingInputStream.java | 54 ++++++++++++++++++++++
3 files changed, 86 insertions(+), 9 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index be5b1799b35..4b50ab2c04b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -1164,6 +1164,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
+ @VisibleForTesting
public S3AInputStreamStatistics getS3AStreamStatistics() {
return streamStatistics;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
index 76ef942ed65..f778f40b74c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
@@ -56,6 +57,21 @@ public class S3APrefetchingInputStream
*/
private S3ARemoteInputStream inputStream;
+ /**
+ * To be only used by synchronized getPos().
+ */
+ private long lastReadCurrentPos = 0;
+
+ /**
+ * To be only used by getIOStatistics().
+ */
+ private IOStatistics ioStatistics = null;
+
+ /**
+ * To be only used by getS3AStreamStatistics().
+ */
+ private S3AInputStreamStatistics inputStreamStatistics = null;
+
/**
* Initializes a new instance of the {@code S3APrefetchingInputStream} class.
*
@@ -115,14 +131,20 @@ public class S3APrefetchingInputStream
}
/**
- * Gets the current position.
+ * Gets the current position. If the underlying S3 input stream is closed,
+ * it returns last read current position from the underlying steam. If the
+ * current position was never read and the underlying input stream is closed,
+ * this would return 0.
*
* @return the current position.
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized long getPos() throws IOException {
- return isClosed() ? 0 : inputStream.getPos();
+ if (!isClosed()) {
+ lastReadCurrentPos = inputStream.getPos();
+ }
+ return lastReadCurrentPos;
}
/**
@@ -215,11 +237,12 @@ public class S3APrefetchingInputStream
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
+ @VisibleForTesting
public S3AInputStreamStatistics getS3AStreamStatistics() {
- if (isClosed()) {
- return null;
+ if (!isClosed()) {
+ inputStreamStatistics = inputStream.getS3AStreamStatistics();
}
- return inputStream.getS3AStreamStatistics();
+ return inputStreamStatistics;
}
/**
@@ -229,10 +252,10 @@ public class S3APrefetchingInputStream
*/
@Override
public IOStatistics getIOStatistics() {
- if (isClosed()) {
- return null;
+ if (!isClosed()) {
+ ioStatistics = inputStream.getIOStatistics();
}
- return inputStream.getIOStatistics();
+ return ioStatistics;
}
protected boolean isClosed() {
@@ -249,7 +272,6 @@ public class S3APrefetchingInputStream
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
- throwIfClosed();
return false;
}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
index d597c05e1da..93611220f94 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
@@ -240,4 +242,56 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
}
}
+ @Test
+ public void testStatusProbesAfterClosingStream() throws Throwable {
+ describe("When the underlying input stream is closed, the prefetch input stream"
+ + " should still support some status probes");
+
+ byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+ Path smallFile = methodPath();
+ ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
+
+ FSDataInputStream in = getFileSystem().open(smallFile);
+
+ byte[] buffer = new byte[SMALL_FILE_SIZE];
+ in.read(buffer, 0, S_1K * 4);
+ in.seek(S_1K * 12);
+ in.read(buffer, 0, S_1K * 4);
+
+ long pos = in.getPos();
+ IOStatistics ioStats = in.getIOStatistics();
+ S3AInputStreamStatistics inputStreamStatistics =
+ ((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
+
+ assertNotNull("Prefetching input IO stats should not be null", ioStats);
+ assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
+ assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
+ pos);
+
+ in.close();
+
+ // status probes after closing the input stream
+ long newPos = in.getPos();
+ IOStatistics newIoStats = in.getIOStatistics();
+ S3AInputStreamStatistics newInputStreamStatistics =
+ ((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
+
+ assertNotNull("Prefetching input IO stats should not be null", newIoStats);
+ assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
+ assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
+ newPos);
+
+ // compare status probes after closing of the stream with status probes done before
+ // closing the stream
+ assertEquals("Position retrieved through stream before and after closing should match", pos,
+ newPos);
+ assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
+ newIoStats);
+ assertEquals("Stream stats retrieved through stream before and after closing should match",
+ inputStreamStatistics, newInputStreamStatistics);
+
+ assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));
+
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org