You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/12/19 12:24:33 UTC

[hadoop] branch branch-3.3 updated: HADOOP-17338. Intermittent S3AInputStream failures: Premature end of Content-Length delimited message body etc (#2497)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new adf6ca1  HADOOP-17338. Intermittent S3AInputStream failures: Premature end of Content-Length delimited message body etc (#2497)
adf6ca1 is described below

commit adf6ca18b4cfc5274aef04d96b4498690fea0cf8
Author: yzhangal <yj...@yahoo.com>
AuthorDate: Fri Dec 18 11:08:10 2020 -0800

    HADOOP-17338. Intermittent S3AInputStream failures: Premature end of Content-Length delimited message body etc (#2497)
    
    Yongjun Zhang <yo...@pinterest.com>
    
    Change-Id: Ibbc6a39afb82de1208e6ed6a63ede224cc425466
---
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   | 57 ++++++++++++++++------
 1 file changed, 42 insertions(+), 15 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 542fe34..bd8adad 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -87,6 +87,14 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    * set
    */
   private volatile boolean closed;
+  /**
+   * wrappedStream is associated with an object (instance of S3Object). When
+   * the object is garbage collected, the associated wrappedStream will be
+   * closed. Keep a reference to this object to prevent the wrapperStream
+   * still in use from being closed unexpectedly due to garbage collection.
+   * See HADOOP-17338 for details.
+   */
+  private S3Object object;
   private S3ObjectInputStream wrappedStream;
   private final S3AReadOpContext context;
   private final AmazonS3 client;
@@ -202,7 +210,7 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     String text = String.format("%s %s at %d",
         operation, uri, targetPos);
     changeTracker.maybeApplyConstraint(request);
-    S3Object object = Invoker.once(text, uri,
+    object = Invoker.once(text, uri,
         () -> client.getObject(request));
 
     changeTracker.processResponse(object, operation,
@@ -430,9 +438,15 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   @Retries.OnceTranslated
   private void onReadFailure(IOException ioe, int length, boolean forceAbort)
           throws IOException {
-
-    LOG.info("Got exception while trying to read from stream {}" +
-        " trying to recover: " + ioe, uri);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got exception while trying to read from stream {}, " +
+          "client: {} object: {}, trying to recover: ",
+          uri, client, object, ioe);
+    } else {
+      LOG.info("Got exception while trying to read from stream {}, " +
+          "client: {} object: {}, trying to recover: " + ioe,
+          uri, client, object);
+    }
     streamStatistics.readException();
     reopen("failure recovery", pos, length, forceAbort);
   }
@@ -550,14 +564,19 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    */
   @Retries.OnceRaw
   private void closeStream(String reason, long length, boolean forceAbort) {
-    if (isObjectStreamOpen()) {
+    if (!isObjectStreamOpen()) {
+      // steam is already closed
+      return;
+    }
+
+    // if the amount of data remaining in the current request is greater
+    // than the readahead value: abort.
+    long remaining = remainingInCurrentRequest();
+    LOG.debug("Closing stream {}: {}", reason,
+        forceAbort ? "abort" : "soft");
+    boolean shouldAbort = forceAbort || remaining > readahead;
 
-      // if the amount of data remaining in the current request is greater
-      // than the readahead value: abort.
-      long remaining = remainingInCurrentRequest();
-      LOG.debug("Closing stream {}: {}", reason,
-          forceAbort ? "abort" : "soft");
-      boolean shouldAbort = forceAbort || remaining > readahead;
+    try {
       if (!shouldAbort) {
         try {
           // clean close. This will read to the end of the stream,
@@ -578,25 +597,33 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
           streamStatistics.streamClose(false, drained);
         } catch (Exception e) {
           // exception escalates to an abort
-          LOG.debug("When closing {} stream for {}", uri, reason, e);
+          LOG.debug("When closing {} stream for {}, will abort the stream",
+              uri, reason, e);
           shouldAbort = true;
         }
       }
       if (shouldAbort) {
         // Abort, rather than just close, the underlying stream.  Otherwise, the
         // remaining object payload is read from S3 while closing the stream.
-        LOG.debug("Aborting stream");
-        wrappedStream.abort();
+        LOG.debug("Aborting stream {}", uri);
+        try {
+          wrappedStream.abort();
+        } catch (Exception e) {
+          LOG.warn("When aborting {} stream after failing to close it for {}",
+              uri, reason, e);
+        }
         streamStatistics.streamClose(true, remaining);
       }
       LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
               + " nextReadPos={}," +
-          " request range {}-{} length={}",
+              " request range {}-{} length={}",
           uri, (shouldAbort ? "aborted" : "closed"), reason,
           remaining, pos, nextReadPos,
           contentRangeStart, contentRangeFinish,
           length);
+    } finally {
       wrappedStream = null;
+      object = null;
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org