You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/07/12 04:53:20 UTC
[04/50] [abbrv] hadoop git commit: HADOOP-14596. AWS SDK 1.11+
aborts() on close() if > 0 bytes in stream;
logs error. Contributed by Steve Loughran
HADOOP-14596. AWS SDK 1.11+ aborts() on close() if > 0 bytes in stream; logs error. Contributed by Steve Loughran
Change-Id: I49173bf6163796903d64594a8ca8a4bd26ad2bfc
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/72993b33
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/72993b33
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/72993b33
Branch: refs/heads/YARN-5972
Commit: 72993b33b704991f2a0bf743f31b164e58a2dabc
Parents: ec97519
Author: Mingliang Liu <li...@apache.org>
Authored: Thu Jun 29 17:00:25 2017 -0700
Committer: Mingliang Liu <li...@apache.org>
Committed: Thu Jun 29 17:07:52 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/fs/s3a/S3AInputStream.java | 26 +++++++++++++++++---
1 file changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/72993b33/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 7d322a5..b88b7c1 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
@@ -78,7 +79,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
private final String key;
private final long contentLength;
private final String uri;
- public static final Logger LOG = S3AFileSystem.LOG;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AInputStream.class);
private final S3AInstrumentation.InputStreamStatistics streamStatistics;
private S3AEncryptionMethods serverSideEncryptionAlgorithm;
private String serverSideEncryptionKey;
@@ -451,13 +453,27 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
+ LOG.debug("Closing stream {}: {}", reason,
+ forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;
if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
// so, while cleaner, can be pathological on a multi-GB object
+
+ // explicitly drain the stream
+ long drained = 0;
+ while (wrappedStream.read() >= 0) {
+ drained++;
+ }
+ LOG.debug("Drained stream of {} bytes", drained);
+
+ // now close it
wrappedStream.close();
- streamStatistics.streamClose(false, remaining);
+ // this MUST come after the close, so that if the IO operations fail
+ // and an abort is triggered, the initial attempt's statistics
+ // aren't collected.
+ streamStatistics.streamClose(false, drained);
} catch (IOException e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}", uri, reason, e);
@@ -467,13 +483,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
+ LOG.debug("Aborting stream");
wrappedStream.abort();
streamStatistics.streamClose(true, remaining);
}
- LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," +
+ LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
+ + " nextReadPos={}," +
" request range {}-{} length={}",
uri, (shouldAbort ? "aborted" : "closed"), reason,
- pos, nextReadPos,
+ remaining, pos, nextReadPos,
contentRangeStart, contentRangeFinish,
length);
wrappedStream = null;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org