You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2023/04/04 15:31:58 UTC

[iceberg] branch master updated: AWS: abort S3 input stream on close if not EOS (#7262)

This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 49e930877a AWS: abort S3 input stream on close if not EOS (#7262)
49e930877a is described below

commit 49e930877a16bce2df51d6e51b737d2969208644
Author: Bryan Keller <br...@gmail.com>
AuthorDate: Tue Apr 4 08:31:51 2023 -0700

    AWS: abort S3 input stream on close if not EOS (#7262)
    
    * AWS: abort S3 input stream on close if not EOS
    
    * Close the stream for backwards compatibility
    
    * undo unrelated change
    
    * add trace log
    
    * comment update
    
    * logger updates
    
    * handle connection closed exception
---
 .../org/apache/iceberg/aws/s3/S3InputStream.java   | 25 +++++++++++++++++++++-
 1 file changed, 24 insertions(+), 1 deletion(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
index 1a45ad0d0c..7d83cea3f1 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.sync.ResponseTransformer;
+import software.amazon.awssdk.http.Abortable;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
@@ -196,7 +197,29 @@ class S3InputStream extends SeekableInputStream implements RangeReadable {
 
   private void closeStream() throws IOException {
     if (stream != null) {
-      stream.close();
+      // if we aren't at the end of the stream, and the stream is abortable, then
+      // call abort() so we don't read the remaining data with the Apache HTTP client
+      abortStream();
+      try {
+        stream.close();
+      } catch (IOException e) {
+        // the Apache HTTP client will throw a ConnectionClosedException
+        // when closing an aborted stream, which is expected
+        if (!e.getClass().getSimpleName().equals("ConnectionClosedException")) {
+          throw e;
+        }
+      }
+      stream = null;
+    }
+  }
+
+  private void abortStream() {
+    try {
+      if (stream instanceof Abortable && stream.read() != -1) {
+        ((Abortable) stream).abort();
+      }
+    } catch (Exception e) {
+      LOG.warn("An error occurred while aborting the stream", e);
     }
   }