You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/06/09 15:46:09 UTC

[GitHub] [hadoop] ahmarsuhail commented on a diff in pull request #4386: HADOOP-18231. Fixes failing tests & drain stream async.

ahmarsuhail commented on code in PR #4386:
URL: https://github.com/apache/hadoop/pull/4386#discussion_r893675594


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java:
##########
@@ -214,7 +213,83 @@ void close(InputStream inputStream) {
       this.s3Objects.remove(inputStream);
     }
 
+    if (numRemainingBytes <= this.context.getAsyncDrainThreshold()) {
+      // don't bother with async io.
+      drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+    } else {
+      LOG.debug("initiating asynchronous drain of {} bytes", numRemainingBytes);
+      // schedule an async drain/abort with references to the fields so they
+      // can be reused
+      client.submit(() -> drain(false, "close() operation", numRemainingBytes, obj, inputStream));
+    }
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object;
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drain(final boolean shouldAbort, final String reason, final long remaining,
+      final S3Object requestObject, final InputStream inputStream) {
+
+    try {
+      return invokeTrackingDuration(streamStatistics.initiateInnerStreamClose(shouldAbort),
+          () -> drainOrAbortHttpStream(shouldAbort, reason, remaining, requestObject, inputStream));
+    } catch (IOException e) {
+      // this is only here because invokeTrackingDuration() has it in its
+      // signature
+      return shouldAbort;
+    }
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   *
+   * @param shouldAbort   force an abort; used if explicitly requested.
+   * @param reason        reason for stream being closed; used in messages
+   * @param remaining     remaining bytes
+   * @param requestObject http request object
+   * @param inputStream   stream to close.
+   * @return was the stream aborted?
+   */
+  private boolean drainOrAbortHttpStream(boolean shouldAbort, final String reason,
+      final long remaining, final S3Object requestObject, final InputStream inputStream) {
+
+    if (!shouldAbort && remaining > 0) {
+      try {
+        long drained = 0;
+        byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
+        while (true) {
+          final int count = inputStream.read(buffer);
+          if (count < 0) {
+            // no more data is left
+            break;
+          }
+          drained += count;
+        }
+        LOG.debug("Drained stream of {} bytes", drained);
+      } catch (Exception e) {
+        // exception escalates to an abort
+        LOG.debug("When closing {} stream for {}, will abort the stream", uri, reason, e);
+        shouldAbort = true;
+      }
+    }
     Io.closeIgnoringIoException(inputStream);
-    Io.closeIgnoringIoException(obj);
+    Io.closeIgnoringIoException(requestObject);

Review Comment:
   This was a new `Io` class added as part of the initial prefetching PR. I've removed this class, and updated usages with `cleanupWithLogger`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org