You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/07/11 13:55:43 UTC

hadoop git commit: HADOOP-15541. [s3a] Shouldn't try to drain stream before aborting connection in case of timeout. Contributed by Sean Mackrory.

Repository: hadoop
Updated Branches:
  refs/heads/branch-3.1 2aaad4000 -> caf38532f


HADOOP-15541. [s3a] Shouldn't try to drain stream before aborting
connection in case of timeout. Contributed by Sean Mackrory.

(cherry picked from commit d503f65b6689b19278ec2a0cf9da5a8762539de8)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/caf38532
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/caf38532
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/caf38532

Branch: refs/heads/branch-3.1
Commit: caf38532f3f3eafb4c874a6debddaad2fb2aa201
Parents: 2aaad40
Author: Steve Loughran <st...@apache.org>
Authored: Wed Jul 11 14:55:11 2018 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Wed Jul 11 14:55:11 2018 +0100

----------------------------------------------------------------------
 .../apache/hadoop/fs/s3a/S3AInputStream.java    | 24 +++++++++++++-------
 1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/caf38532/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index c54d3e26..91a2d9d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 
@@ -155,11 +156,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    * @throws IOException on any failure to open the object
    */
   @Retries.OnceTranslated
-  private synchronized void reopen(String reason, long targetPos, long length)
-      throws IOException {
+  private synchronized void reopen(String reason, long targetPos, long length,
+          boolean forceAbort) throws IOException {
 
     if (wrappedStream != null) {
-      closeStream("reopen(" + reason + ")", contentRangeFinish, false);
+      closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort);
     }
 
     contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
@@ -324,7 +325,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
 
           //re-open at specific location if needed
           if (wrappedStream == null) {
-            reopen("read from new offset", targetPos, len);
+            reopen("read from new offset", targetPos, len, false);
           }
         });
   }
@@ -367,8 +368,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
             b = wrappedStream.read();
           } catch (EOFException e) {
             return -1;
+          } catch (SocketTimeoutException e) {
+            onReadFailure(e, 1, true);
+            b = wrappedStream.read();
           } catch (IOException e) {
-            onReadFailure(e, 1);
+            onReadFailure(e, 1, false);
             b = wrappedStream.read();
           }
           return b;
@@ -393,12 +397,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    * @throws IOException any exception thrown on the re-open attempt.
    */
   @Retries.OnceTranslated
-  private void onReadFailure(IOException ioe, int length) throws IOException {
+  private void onReadFailure(IOException ioe, int length, boolean forceAbort)
+          throws IOException {
 
     LOG.info("Got exception while trying to read from stream {}" +
         " trying to recover: " + ioe, uri);
     streamStatistics.readException();
-    reopen("failure recovery", pos, length);
+    reopen("failure recovery", pos, length, forceAbort);
   }
 
   /**
@@ -446,8 +451,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
           } catch (EOFException e) {
             // the base implementation swallows EOFs.
             return -1;
+          } catch (SocketTimeoutException e) {
+            onReadFailure(e, len, true);
+            bytes = wrappedStream.read(buf, off, len);
           } catch (IOException e) {
-            onReadFailure(e, len);
+            onReadFailure(e, len, false);
             bytes= wrappedStream.read(buf, off, len);
           }
           return bytes;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org