You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/09/30 15:43:36 UTC

[GitHub] [hadoop] steveloughran commented on a diff in pull request #4955: HADOOP-18378. Implement lazy seek in S3A prefetching.

steveloughran commented on code in PR #4955:
URL: https://github.com/apache/hadoop/pull/4955#discussion_r984708193


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java:
##########
@@ -139,36 +100,45 @@ protected boolean ensureCurrentBuffer() throws IOException {
       return false;
     }
 
-    if (getFilePosition().isValid() && getFilePosition()
-        .buffer()
-        .hasRemaining()) {
-      return true;
+    long readPos = getNextReadPos();
+    if (!getBlockData().isValidOffset(readPos)) {
+      return false;
     }
 
-    long readPos;
-    int prefetchCount;
-
-    if (getFilePosition().isValid()) {
-      // A sequential read results in a prefetch.
-      readPos = getFilePosition().absolute();
-      prefetchCount = numBlocksToPrefetch;
-    } else {
-      // A seek invalidates the current position.
-      // We prefetch only 1 block immediately after a seek operation.
-      readPos = getSeekTargetPos();
-      prefetchCount = 1;
-    }
+    // Determine whether this is an out of order read.
+    FilePosition filePosition = getFilePosition();
+    boolean outOfOrderRead = !filePosition.setAbsolute(readPos);
 
-    if (!getBlockData().isValidOffset(readPos)) {
-      return false;
+    if (!outOfOrderRead && filePosition.buffer().hasRemaining()) {
+      // Use the current buffer.
+      return true;
     }
 
-    if (getFilePosition().isValid()) {
-      if (getFilePosition().bufferFullyRead()) {
-        blockManager.release(getFilePosition().data());
+    if(filePosition.isValid()) {

Review Comment:
   nit: add a space



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java:
##########
@@ -121,8 +121,8 @@ public void testRead() throws Exception {
 
   private void testReadHelper(S3ARemoteInputStream inputStream, int bufferSize)
       throws Exception {
-    assertEquals(bufferSize, inputStream.available());
     assertEquals(0, inputStream.read());
+    assertEquals(bufferSize - 1, inputStream.available());

Review Comment:
   can you add string of what is being tested. the assertEquals prints the values, but it is important to know what is being validated. same below.
   
   actually, given the #of calls, what about a method `assertAvailable(InputStream, int)` to do the probe everywhere



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java:
##########
@@ -136,6 +136,40 @@ public void testReadLargeFileFully() throws Throwable {
         StoreStatisticNames.ACTION_EXECUTOR_ACQUIRED + SUFFIX_MAX).isGreaterThan(0);
   }
 
+  @Test
+  public void testReadLargeFileFullyLazySeek() throws Throwable {
+    describe("read a large file using readFully(position,buffer,offset,length),"
+        + " uses S3ACachingInputStream");
+    IOStatistics ioStats;
+    openFS();
+
+    try (FSDataInputStream in = largeFileFS.open(largeFile)) {
+      ioStats = in.getIOStatistics();
+
+      byte[] buffer = new byte[S_1M * 10];
+      long bytesRead = 0;
+
+      while (bytesRead < largeFileSize) {
+        in.readFully(bytesRead, buffer, 0, (int) Math.min(buffer.length,
+            largeFileSize - bytesRead));
+        bytesRead += buffer.length;
+        // Blocks are fully read, no blocks should be cached
+        verifyStatisticGaugeValue(ioStats, StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE,

Review Comment:
   add static import for StreamStatisticNames to keep code a bit less verbose



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestFilePosition.java:
##########
@@ -192,4 +193,23 @@ public void testBufferStats() {
     }
     assertTrue(pos.bufferFullyRead());
   }
+
+  @Test
+  public void testBounds() {
+    int bufferSize = 8;
+    long fileSize = bufferSize;
+
+    ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+    BufferData data = new BufferData(0, buffer);
+    FilePosition pos = new FilePosition(fileSize, bufferSize);
+
+    long eofOffset = fileSize;
+    pos.setData(data, 0, eofOffset);
+
+    assertTrue(pos.isWithinCurrentBuffer(eofOffset));

Review Comment:
   need text messages here, ideally with assertj assertThat which lets you use printf formatted patters



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/FilePosition.java:
##########
@@ -116,7 +116,7 @@ public void setData(BufferData bufferData,
         readOffset,
         "readOffset",
         startOffset,
-        startOffset + bufferData.getBuffer().limit() - 1);
+        startOffset + bufferData.getBuffer().limit());

Review Comment:
   going to trust you here. we have shipped the s3a input stream with an off by one error in the past. surfaced in parquet but not ORC, and only in some cases...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org