You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/12/19 12:24:33 UTC
[hadoop] branch branch-3.3 updated: HADOOP-17338. Intermittent
S3AInputStream failures: Premature end of Content-Length delimited message
body etc (#2497)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new adf6ca1 HADOOP-17338. Intermittent S3AInputStream failures: Premature end of Content-Length delimited message body etc (#2497)
adf6ca1 is described below
commit adf6ca18b4cfc5274aef04d96b4498690fea0cf8
Author: yzhangal <yj...@yahoo.com>
AuthorDate: Fri Dec 18 11:08:10 2020 -0800
HADOOP-17338. Intermittent S3AInputStream failures: Premature end of Content-Length delimited message body etc (#2497)
Yongjun Zhang <yo...@pinterest.com>
Change-Id: Ibbc6a39afb82de1208e6ed6a63ede224cc425466
---
.../org/apache/hadoop/fs/s3a/S3AInputStream.java | 57 ++++++++++++++++------
1 file changed, 42 insertions(+), 15 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 542fe34..bd8adad 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -87,6 +87,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* set
*/
private volatile boolean closed;
+ /**
+ * wrappedStream is associated with an object (instance of S3Object). When
+ * the object is garbage collected, the associated wrappedStream will be
+ * closed. Keep a reference to this object to prevent the wrapperStream
+ * still in use from being closed unexpectedly due to garbage collection.
+ * See HADOOP-17338 for details.
+ */
+ private S3Object object;
private S3ObjectInputStream wrappedStream;
private final S3AReadOpContext context;
private final AmazonS3 client;
@@ -202,7 +210,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
String text = String.format("%s %s at %d",
operation, uri, targetPos);
changeTracker.maybeApplyConstraint(request);
- S3Object object = Invoker.once(text, uri,
+ object = Invoker.once(text, uri,
() -> client.getObject(request));
changeTracker.processResponse(object, operation,
@@ -430,9 +438,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
@Retries.OnceTranslated
private void onReadFailure(IOException ioe, int length, boolean forceAbort)
throws IOException {
-
- LOG.info("Got exception while trying to read from stream {}" +
- " trying to recover: " + ioe, uri);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got exception while trying to read from stream {}, " +
+ "client: {} object: {}, trying to recover: ",
+ uri, client, object, ioe);
+ } else {
+ LOG.info("Got exception while trying to read from stream {}, " +
+ "client: {} object: {}, trying to recover: " + ioe,
+ uri, client, object);
+ }
streamStatistics.readException();
reopen("failure recovery", pos, length, forceAbort);
}
@@ -550,14 +564,19 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
*/
@Retries.OnceRaw
private void closeStream(String reason, long length, boolean forceAbort) {
- if (isObjectStreamOpen()) {
+ if (!isObjectStreamOpen()) {
+ // steam is already closed
+ return;
+ }
+
+ // if the amount of data remaining in the current request is greater
+ // than the readahead value: abort.
+ long remaining = remainingInCurrentRequest();
+ LOG.debug("Closing stream {}: {}", reason,
+ forceAbort ? "abort" : "soft");
+ boolean shouldAbort = forceAbort || remaining > readahead;
- // if the amount of data remaining in the current request is greater
- // than the readahead value: abort.
- long remaining = remainingInCurrentRequest();
- LOG.debug("Closing stream {}: {}", reason,
- forceAbort ? "abort" : "soft");
- boolean shouldAbort = forceAbort || remaining > readahead;
+ try {
if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
@@ -578,25 +597,33 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
streamStatistics.streamClose(false, drained);
} catch (Exception e) {
// exception escalates to an abort
- LOG.debug("When closing {} stream for {}", uri, reason, e);
+ LOG.debug("When closing {} stream for {}, will abort the stream",
+ uri, reason, e);
shouldAbort = true;
}
}
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
- LOG.debug("Aborting stream");
- wrappedStream.abort();
+ LOG.debug("Aborting stream {}", uri);
+ try {
+ wrappedStream.abort();
+ } catch (Exception e) {
+ LOG.warn("When aborting {} stream after failing to close it for {}",
+ uri, reason, e);
+ }
streamStatistics.streamClose(true, remaining);
}
LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
+ " nextReadPos={}," +
- " request range {}-{} length={}",
+ " request range {}-{} length={}",
uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining, pos, nextReadPos,
contentRangeStart, contentRangeFinish,
length);
+ } finally {
wrappedStream = null;
+ object = null;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org