You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/07/11 23:10:41 UTC

[41/56] [abbrv] hadoop git commit: HADOOP-15541. [s3a] Shouldn't try to drain stream before aborting connection in case of timeout.

HADOOP-15541. [s3a] Shouldn't try to drain stream before aborting
connection in case of timeout.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d503f65b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d503f65b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d503f65b

Branch: refs/heads/HDFS-12943
Commit: d503f65b6689b19278ec2a0cf9da5a8762539de8
Parents: 705e2c1
Author: Sean Mackrory <ma...@apache.org>
Authored: Thu Jul 5 13:52:00 2018 -0600
Committer: Sean Mackrory <ma...@apache.org>
Committed: Tue Jul 10 17:52:57 2018 +0200

----------------------------------------------------------------------
 .../apache/hadoop/fs/s3a/S3AInputStream.java    | 24 +++++++++++++-------
 1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d503f65b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 440739d..68f98e4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 
@@ -155,11 +156,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    * @throws IOException on any failure to open the object
    */
   @Retries.OnceTranslated
-  private synchronized void reopen(String reason, long targetPos, long length)
-      throws IOException {
+  private synchronized void reopen(String reason, long targetPos, long length,
+          boolean forceAbort) throws IOException {
 
     if (wrappedStream != null) {
-      closeStream("reopen(" + reason + ")", contentRangeFinish, false);
+      closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort);
     }
 
     contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
@@ -324,7 +325,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
 
           //re-open at specific location if needed
           if (wrappedStream == null) {
-            reopen("read from new offset", targetPos, len);
+            reopen("read from new offset", targetPos, len, false);
           }
         });
   }
@@ -367,8 +368,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
             b = wrappedStream.read();
           } catch (EOFException e) {
             return -1;
+          } catch (SocketTimeoutException e) {
+            onReadFailure(e, 1, true);
+            b = wrappedStream.read();
           } catch (IOException e) {
-            onReadFailure(e, 1);
+            onReadFailure(e, 1, false);
             b = wrappedStream.read();
           }
           return b;
@@ -393,12 +397,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    * @throws IOException any exception thrown on the re-open attempt.
    */
   @Retries.OnceTranslated
-  private void onReadFailure(IOException ioe, int length) throws IOException {
+  private void onReadFailure(IOException ioe, int length, boolean forceAbort)
+          throws IOException {
 
     LOG.info("Got exception while trying to read from stream {}" +
         " trying to recover: " + ioe, uri);
     streamStatistics.readException();
-    reopen("failure recovery", pos, length);
+    reopen("failure recovery", pos, length, forceAbort);
   }
 
   /**
@@ -446,8 +451,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
           } catch (EOFException e) {
             // the base implementation swallows EOFs.
             return -1;
+          } catch (SocketTimeoutException e) {
+            onReadFailure(e, len, true);
+            bytes = wrappedStream.read(buf, off, len);
           } catch (IOException e) {
-            onReadFailure(e, len);
+            onReadFailure(e, len, false);
             bytes= wrappedStream.read(buf, off, len);
           }
           return bytes;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org