You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/07/11 23:10:41 UTC
[41/56] [abbrv] hadoop git commit: HADOOP-15541. [s3a] Shouldn't try
to drain stream before aborting connection in case of timeout.
HADOOP-15541. [s3a] Shouldn't try to drain stream before aborting
connection in case of timeout.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d503f65b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d503f65b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d503f65b
Branch: refs/heads/HDFS-12943
Commit: d503f65b6689b19278ec2a0cf9da5a8762539de8
Parents: 705e2c1
Author: Sean Mackrory <ma...@apache.org>
Authored: Thu Jul 5 13:52:00 2018 -0600
Committer: Sean Mackrory <ma...@apache.org>
Committed: Tue Jul 10 17:52:57 2018 +0200
----------------------------------------------------------------------
.../apache/hadoop/fs/s3a/S3AInputStream.java | 24 +++++++++++++-------
1 file changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d503f65b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 440739d..68f98e4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
+import java.net.SocketTimeoutException;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
@@ -155,11 +156,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @throws IOException on any failure to open the object
*/
@Retries.OnceTranslated
- private synchronized void reopen(String reason, long targetPos, long length)
- throws IOException {
+ private synchronized void reopen(String reason, long targetPos, long length,
+ boolean forceAbort) throws IOException {
if (wrappedStream != null) {
- closeStream("reopen(" + reason + ")", contentRangeFinish, false);
+ closeStream("reopen(" + reason + ")", contentRangeFinish, forceAbort);
}
contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
@@ -324,7 +325,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
//re-open at specific location if needed
if (wrappedStream == null) {
- reopen("read from new offset", targetPos, len);
+ reopen("read from new offset", targetPos, len, false);
}
});
}
@@ -367,8 +368,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
b = wrappedStream.read();
} catch (EOFException e) {
return -1;
+ } catch (SocketTimeoutException e) {
+ onReadFailure(e, 1, true);
+ b = wrappedStream.read();
} catch (IOException e) {
- onReadFailure(e, 1);
+ onReadFailure(e, 1, false);
b = wrappedStream.read();
}
return b;
@@ -393,12 +397,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* @throws IOException any exception thrown on the re-open attempt.
*/
@Retries.OnceTranslated
- private void onReadFailure(IOException ioe, int length) throws IOException {
+ private void onReadFailure(IOException ioe, int length, boolean forceAbort)
+ throws IOException {
LOG.info("Got exception while trying to read from stream {}" +
" trying to recover: " + ioe, uri);
streamStatistics.readException();
- reopen("failure recovery", pos, length);
+ reopen("failure recovery", pos, length, forceAbort);
}
/**
@@ -446,8 +451,11 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
+ } catch (SocketTimeoutException e) {
+ onReadFailure(e, len, true);
+ bytes = wrappedStream.read(buf, off, len);
} catch (IOException e) {
- onReadFailure(e, len);
+ onReadFailure(e, len, false);
bytes= wrappedStream.read(buf, off, len);
}
return bytes;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org