You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/10/19 13:38:26 UTC

[hadoop] branch trunk updated: HADOOP-18189 S3APrefetchingInputStream to support status probes when closed (#5036)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8aa04b0b24b HADOOP-18189 S3APrefetchingInputStream to support status probes when closed (#5036)
8aa04b0b24b is described below

commit 8aa04b0b24b1481b2b9785bd2e5e7165a866a9b0
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed Oct 19 06:38:11 2022 -0700

    HADOOP-18189 S3APrefetchingInputStream to support status probes when closed (#5036)
    
    
    Contributed by Viraj Jasani
---
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   |  1 +
 .../fs/s3a/prefetch/S3APrefetchingInputStream.java | 40 ++++++++++++----
 .../fs/s3a/ITestS3APrefetchingInputStream.java     | 54 ++++++++++++++++++++++
 3 files changed, 86 insertions(+), 9 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index be5b1799b35..4b50ab2c04b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -1164,6 +1164,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
+  @VisibleForTesting
   public S3AInputStreamStatistics getS3AStreamStatistics() {
     return streamStatistics;
   }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
index 76ef942ed65..f778f40b74c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
@@ -56,6 +57,21 @@ public class S3APrefetchingInputStream
    */
   private S3ARemoteInputStream inputStream;
 
+  /**
+   * To be only used by synchronized getPos().
+   */
+  private long lastReadCurrentPos = 0;
+
+  /**
+   * To be only used by getIOStatistics().
+   */
+  private IOStatistics ioStatistics = null;
+
+  /**
+   * To be only used by getS3AStreamStatistics().
+   */
+  private S3AInputStreamStatistics inputStreamStatistics = null;
+
   /**
    * Initializes a new instance of the {@code S3APrefetchingInputStream} class.
    *
@@ -115,14 +131,20 @@ public class S3APrefetchingInputStream
   }
 
   /**
-   * Gets the current position.
+   * Gets the current position. If the underlying S3 input stream is closed,
+   * it returns last read current position from the underlying steam. If the
+   * current position was never read and the underlying input stream is closed,
+   * this would return 0.
    *
    * @return the current position.
    * @throws IOException if there is an IO error during this operation.
    */
   @Override
   public synchronized long getPos() throws IOException {
-    return isClosed() ? 0 : inputStream.getPos();
+    if (!isClosed()) {
+      lastReadCurrentPos = inputStream.getPos();
+    }
+    return lastReadCurrentPos;
   }
 
   /**
@@ -215,11 +237,12 @@ public class S3APrefetchingInputStream
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
+  @VisibleForTesting
   public S3AInputStreamStatistics getS3AStreamStatistics() {
-    if (isClosed()) {
-      return null;
+    if (!isClosed()) {
+      inputStreamStatistics = inputStream.getS3AStreamStatistics();
     }
-    return inputStream.getS3AStreamStatistics();
+    return inputStreamStatistics;
   }
 
   /**
@@ -229,10 +252,10 @@ public class S3APrefetchingInputStream
    */
   @Override
   public IOStatistics getIOStatistics() {
-    if (isClosed()) {
-      return null;
+    if (!isClosed()) {
+      ioStatistics = inputStream.getIOStatistics();
     }
-    return inputStream.getIOStatistics();
+    return ioStatistics;
   }
 
   protected boolean isClosed() {
@@ -249,7 +272,6 @@ public class S3APrefetchingInputStream
 
   @Override
   public boolean seekToNewSource(long targetPos) throws IOException {
-    throwIfClosed();
     return false;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
index d597c05e1da..93611220f94 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
@@ -240,4 +242,56 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest {
     }
   }
 
+  @Test
+  public void testStatusProbesAfterClosingStream() throws Throwable {
+    describe("When the underlying input stream is closed, the prefetch input stream"
+        + " should still support some status probes");
+
+    byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
+    Path smallFile = methodPath();
+    ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
+
+    FSDataInputStream in = getFileSystem().open(smallFile);
+
+    byte[] buffer = new byte[SMALL_FILE_SIZE];
+    in.read(buffer, 0, S_1K * 4);
+    in.seek(S_1K * 12);
+    in.read(buffer, 0, S_1K * 4);
+
+    long pos = in.getPos();
+    IOStatistics ioStats = in.getIOStatistics();
+    S3AInputStreamStatistics inputStreamStatistics =
+        ((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
+
+    assertNotNull("Prefetching input IO stats should not be null", ioStats);
+    assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
+    assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
+        pos);
+
+    in.close();
+
+    // status probes after closing the input stream
+    long newPos = in.getPos();
+    IOStatistics newIoStats = in.getIOStatistics();
+    S3AInputStreamStatistics newInputStreamStatistics =
+        ((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
+
+    assertNotNull("Prefetching input IO stats should not be null", newIoStats);
+    assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
+    assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
+        newPos);
+
+    // compare status probes after closing of the stream with status probes done before
+    // closing the stream
+    assertEquals("Position retrieved through stream before and after closing should match", pos,
+        newPos);
+    assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
+        newIoStats);
+    assertEquals("Stream stats retrieved through stream before and after closing should match",
+        inputStreamStatistics, newInputStreamStatistics);
+
+    assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));
+
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org