You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/05/04 18:46:57 UTC
[hadoop] branch feature-HADOOP-18028-s3a-prefetch updated: HADOOP-18175. fix test failures with prefetching s3a input stream (#4212)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch feature-HADOOP-18028-s3a-prefetch
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/feature-HADOOP-18028-s3a-prefetch by this push:
new f38bbe2e9f6 HADOOP-18175. fix test failures with prefetching s3a input stream (#4212)
f38bbe2e9f6 is described below
commit f38bbe2e9f678b156e73ca881dc6230e368ecae4
Author: monthonk <47...@users.noreply.github.com>
AuthorDate: Wed May 4 19:46:39 2022 +0100
HADOOP-18175. fix test failures with prefetching s3a input stream (#4212)
Contributed by Monthon Klongklaew
---
.../java/org/apache/hadoop/fs/s3a/read/S3File.java | 1 +
.../org/apache/hadoop/fs/s3a/read/S3InputStream.java | 18 ++++++++----------
.../hadoop/fs/s3a/read/S3PrefetchingInputStream.java | 3 +--
.../apache/hadoop/fs/s3a/ITestS3ARequesterPays.java | 16 +++++++++++-----
.../org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java | 10 ++++++++++
.../apache/hadoop/fs/s3a/read/TestS3InputStream.java | 5 -----
6 files changed, 31 insertions(+), 22 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
index 501186a2549..88854b87c81 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
@@ -164,6 +164,7 @@ public class S3File implements Closeable {
Validate.checkLessOrEqual(offset, "offset", size(), "size()");
Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset");
+ streamStatistics.streamOpened();
final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey())
.withRange(offset, offset + size - 1);
this.changeTracker.maybeApplyConstraint(request);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
index 0fa6e33200b..00d5fbc367d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
@@ -254,6 +254,10 @@ public abstract class S3InputStream
public int read() throws IOException {
this.throwIfClosed();
+ if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
+ return -1;
+ }
+
if (!ensureCurrentBuffer()) {
return -1;
}
@@ -296,6 +300,10 @@ public abstract class S3InputStream
return 0;
}
+ if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
+ return -1;
+ }
+
if (!ensureCurrentBuffer()) {
return -1;
}
@@ -427,18 +435,8 @@ public abstract class S3InputStream
}
protected void throwIfInvalidSeek(long pos) throws EOFException {
- long fileSize = this.s3File.size();
if (pos < 0) {
throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
- } else {
- if (fileSize == 0 && pos == 0) {
- // Do nothing. Valid combination.
- return;
- }
-
- if (pos >= fileSize) {
- throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
- }
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java
index c874a8c37b8..0f5834da4cc 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java
@@ -103,8 +103,7 @@ public class S3PrefetchingInputStream
*/
@Override
public synchronized long getPos() throws IOException {
- this.throwIfClosed();
- return this.inputStream.getPos();
+ return this.isClosed() ? 0 : this.inputStream.getPos();
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
index c2e7684cad6..9b9461c420a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -71,11 +72,16 @@ public class ITestS3ARequesterPays extends AbstractS3ATestBase {
inputStream.seek(0);
inputStream.readByte();
- // Verify > 1 call was made, so we're sure it is correctly configured for each request
- IOStatisticAssertions
- .assertThatStatisticCounter(inputStream.getIOStatistics(),
- StreamStatisticNames.STREAM_READ_OPENED)
- .isGreaterThan(1);
+ if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) {
+ // For S3PrefetchingInputStream, verify a call was made
+ IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
+ StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1);
+ } else {
+ // For S3AInputStream, verify > 1 call was made,
+ // so we're sure it is correctly configured for each request
+ IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
+ StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1);
+ }
// Check list calls work without error
fs.listFiles(requesterPaysPath.getParent(), false);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
index 3d7ee0882ef..3a2d1b1b09a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -33,6 +34,7 @@ import org.junit.Test;
import java.io.IOException;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES;
@@ -72,6 +74,7 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
IOStatisticsSnapshot iostats = new IOStatisticsSnapshot();
// Open file, read half the data, and then call unbuffer
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ skipIfCannotUnbuffer(inputStream);
assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
int bytesToRead = 8;
readAndAssertBytesRead(inputStream, bytesToRead);
@@ -138,6 +141,7 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
Object streamStatsStr;
try {
inputStream = fs.open(dest);
+ skipIfCannotUnbuffer(inputStream);
streamStatsStr = demandStringifyIOStatisticsSource(inputStream);
LOG.info("initial stream statistics {}", streamStatsStr);
@@ -192,6 +196,12 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
}
+ private void skipIfCannotUnbuffer(FSDataInputStream inputStream) {
+ if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) {
+ skip("input stream does not support unbuffer");
+ }
+ }
+
/**
* Read the specified number of bytes from the given
* {@link FSDataInputStream} and assert that
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
index 010bc1c30b6..e3c6c002bff 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
@@ -169,11 +169,6 @@ public class TestS3InputStream extends AbstractHadoopTestBase {
EOFException.class,
FSExceptionMessages.NEGATIVE_SEEK,
() -> inputStream.seek(-1));
-
- ExceptionAsserts.assertThrows(
- EOFException.class,
- FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
- () -> inputStream.seek(fileSize + 1));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org