You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/05/04 18:46:57 UTC

[hadoop] branch feature-HADOOP-18028-s3a-prefetch updated: HADOOP-18175. fix test failures with prefetching s3a input stream (#4212)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch feature-HADOOP-18028-s3a-prefetch
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/feature-HADOOP-18028-s3a-prefetch by this push:
     new f38bbe2e9f6 HADOOP-18175. fix test failures with prefetching s3a input stream (#4212)
f38bbe2e9f6 is described below

commit f38bbe2e9f678b156e73ca881dc6230e368ecae4
Author: monthonk <47...@users.noreply.github.com>
AuthorDate: Wed May 4 19:46:39 2022 +0100

    HADOOP-18175. fix test failures with prefetching s3a input stream (#4212)
    
    
    Contributed by Monthon Klongklaew
---
 .../java/org/apache/hadoop/fs/s3a/read/S3File.java     |  1 +
 .../org/apache/hadoop/fs/s3a/read/S3InputStream.java   | 18 ++++++++----------
 .../hadoop/fs/s3a/read/S3PrefetchingInputStream.java   |  3 +--
 .../apache/hadoop/fs/s3a/ITestS3ARequesterPays.java    | 16 +++++++++++-----
 .../org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java     | 10 ++++++++++
 .../apache/hadoop/fs/s3a/read/TestS3InputStream.java   |  5 -----
 6 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
index 501186a2549..88854b87c81 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java
@@ -164,6 +164,7 @@ public class S3File implements Closeable {
     Validate.checkLessOrEqual(offset, "offset", size(), "size()");
     Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset");
 
+    streamStatistics.streamOpened();
     final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey())
         .withRange(offset, offset + size - 1);
     this.changeTracker.maybeApplyConstraint(request);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
index 0fa6e33200b..00d5fbc367d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java
@@ -254,6 +254,10 @@ public abstract class S3InputStream
   public int read() throws IOException {
     this.throwIfClosed();
 
+    if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
+      return -1;
+    }
+
     if (!ensureCurrentBuffer()) {
       return -1;
     }
@@ -296,6 +300,10 @@ public abstract class S3InputStream
       return 0;
     }
 
+    if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
+      return -1;
+    }
+
     if (!ensureCurrentBuffer()) {
       return -1;
     }
@@ -427,18 +435,8 @@ public abstract class S3InputStream
   }
 
   protected void throwIfInvalidSeek(long pos) throws EOFException {
-    long fileSize = this.s3File.size();
     if (pos < 0) {
       throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
-    } else {
-      if (fileSize == 0 && pos == 0) {
-        // Do nothing. Valid combination.
-        return;
-      }
-
-      if (pos >= fileSize) {
-        throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
-      }
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java
index c874a8c37b8..0f5834da4cc 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java
@@ -103,8 +103,7 @@ public class S3PrefetchingInputStream
    */
   @Override
   public synchronized long getPos() throws IOException {
-    this.throwIfClosed();
-    return this.inputStream.getPos();
+    return this.isClosed() ? 0 : this.inputStream.getPos();
   }
 
   /**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
index c2e7684cad6..9b9461c420a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
 import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
@@ -71,11 +72,16 @@ public class ITestS3ARequesterPays extends AbstractS3ATestBase {
       inputStream.seek(0);
       inputStream.readByte();
 
-      // Verify > 1 call was made, so we're sure it is correctly configured for each request
-      IOStatisticAssertions
-          .assertThatStatisticCounter(inputStream.getIOStatistics(),
-              StreamStatisticNames.STREAM_READ_OPENED)
-          .isGreaterThan(1);
+      if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) {
+        // For S3PrefetchingInputStream, verify a call was made
+        IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
+            StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1);
+      } else {
+        // For S3AInputStream, verify > 1 call was made,
+        // so we're sure it is correctly configured for each request
+        IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
+            StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1);
+      }
 
       // Check list calls work without error
       fs.listFiles(requesterPaysPath.getParent(), false);
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
index 3d7ee0882ef..3a2d1b1b09a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -33,6 +34,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES;
@@ -72,6 +74,7 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
     IOStatisticsSnapshot iostats = new IOStatisticsSnapshot();
     // Open file, read half the data, and then call unbuffer
     try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      skipIfCannotUnbuffer(inputStream);
       assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
       int bytesToRead = 8;
       readAndAssertBytesRead(inputStream, bytesToRead);
@@ -138,6 +141,7 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
     Object streamStatsStr;
     try {
       inputStream = fs.open(dest);
+      skipIfCannotUnbuffer(inputStream);
       streamStatsStr = demandStringifyIOStatisticsSource(inputStream);
 
       LOG.info("initial stream statistics {}", streamStatsStr);
@@ -192,6 +196,12 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
     return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
   }
 
+  private void skipIfCannotUnbuffer(FSDataInputStream inputStream) {
+    if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) {
+      skip("input stream does not support unbuffer");
+    }
+  }
+
   /**
    * Read the specified number of bytes from the given
    * {@link FSDataInputStream} and assert that
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
index 010bc1c30b6..e3c6c002bff 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
@@ -169,11 +169,6 @@ public class TestS3InputStream extends AbstractHadoopTestBase {
         EOFException.class,
         FSExceptionMessages.NEGATIVE_SEEK,
         () -> inputStream.seek(-1));
-
-    ExceptionAsserts.assertThrows(
-        EOFException.class,
-        FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
-        () -> inputStream.seek(fileSize + 1));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org