You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/10/10 12:31:48 UTC

[hadoop] 02/02: HADOOP-18460. checkIfVectoredIOStopped before populating the buffers (#4986)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 77cb778a4462a2da26663b431e9e179e9f6b3208
Author: Mukund Thakur <mt...@cloudera.com>
AuthorDate: Mon Oct 10 15:47:45 2022 +0530

    HADOOP-18460. checkIfVectoredIOStopped before populating the buffers (#4986)
    
    Contributed by Mukund Thakur
---
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   | 43 ++++++++++++++--------
 1 file changed, 28 insertions(+), 15 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 39d41f5ffd2..be5b1799b35 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -910,21 +910,15 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
   private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
                                                   IntFunction<ByteBuffer> allocate) {
     LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr);
-    // This reference is must be kept till all buffers are populated as this is a
+    // This reference must be kept till all buffers are populated as this is a
     // finalizable object which closes the internal stream when gc triggers.
     S3Object objectRange = null;
     S3ObjectInputStream objectContent = null;
     try {
-      checkIfVectoredIOStopped();
-      final String operationName = "readCombinedFileRange";
-      objectRange = getS3Object(operationName,
+      objectRange = getS3ObjectAndValidateNotNull("readCombinedFileRange",
               combinedFileRange.getOffset(),
               combinedFileRange.getLength());
       objectContent = objectRange.getObjectContent();
-      if (objectContent == null) {
-        throw new PathIOException(uri,
-                "Null IO stream received during " + operationName);
-      }
       populateChildBuffers(combinedFileRange, objectContent, allocate);
     } catch (Exception ex) {
       LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex);
@@ -1019,19 +1013,15 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
    */
   private void readSingleRange(FileRange range, ByteBuffer buffer) {
     LOG.debug("Start reading range {} from path {} ", range, pathStr);
+    // This reference must be kept till all buffers are populated as this is a
+    // finalizable object which closes the internal stream when gc triggers.
     S3Object objectRange = null;
     S3ObjectInputStream objectContent = null;
     try {
-      checkIfVectoredIOStopped();
       long position = range.getOffset();
       int length = range.getLength();
-      final String operationName = "readRange";
-      objectRange = getS3Object(operationName, position, length);
+      objectRange = getS3ObjectAndValidateNotNull("readSingleRange", position, length);
       objectContent = objectRange.getObjectContent();
-      if (objectContent == null) {
-        throw new PathIOException(uri,
-                "Null IO stream received during " + operationName);
-      }
       populateBuffer(length, buffer, objectContent);
       range.getData().complete(buffer);
     } catch (Exception ex) {
@@ -1043,6 +1033,29 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     LOG.debug("Finished reading range {} from path {} ", range, pathStr);
   }
 
+  /**
+   * Get the s3 object for S3 server for a specified range.
+   * Also checks if the vectored io operation has been stopped before and after
+   * the http get request such that we don't waste time populating the buffers.
+   * @param operationName name of the operation for which get object on S3 is called.
+   * @param position position of the object to be read from S3.
+   * @param length length from position of the object to be read from S3.
+   * @return result s3 object.
+   * @throws IOException exception if any.
+   */
+  private S3Object getS3ObjectAndValidateNotNull(final String operationName,
+                                                 final long position,
+                                                 final int length) throws IOException {
+    checkIfVectoredIOStopped();
+    S3Object objectRange = getS3Object(operationName, position, length);
+    if (objectRange.getObjectContent() == null) {
+      throw new PathIOException(uri,
+              "Null IO stream received during " + operationName);
+    }
+    checkIfVectoredIOStopped();
+    return objectRange;
+  }
+
   /**
    * Populates the buffer with data from objectContent
    * till length. Handles both direct and heap byte buffers.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org