You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/09/22 13:00:00 UTC

[jira] [Commented] (HADOOP-18463) Add an integration test to process data asynchronously during vectored read.

    [ https://issues.apache.org/jira/browse/HADOOP-18463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17608270#comment-17608270 ] 

ASF GitHub Bot commented on HADOOP-18463:
-----------------------------------------

steveloughran commented on code in PR #4921:
URL: https://github.com/apache/hadoop/pull/4921#discussion_r977566267


##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
     }
   }
 
+  /**
+   * This test creates list of ranges and then submit a readVectored
+   * operation and then uses a separate thread pool to process the
+   * results asynchronously.
+   */
+  @Test
+  public void testVectoredIOEndToEnd() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+    ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+    CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+      // user can perform other computations while waiting for IO.
+      for (FileRange res : fileRanges) {
+        dataProcessor.submit(() -> {
+          try {
+            readBufferValidateDataAndReturnToPool(pool, res, countDown);
+          } catch (Exception e) {
+            LOG.error("Error while process result for {} ", res, e);
+          }
+        });
+      }
+      if (!countDown.await(100, TimeUnit.SECONDS)) {

Review Comment:
   timeout should be a static constant and more visible



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
     }
   }
 
+  /**
+   * This test creates list of ranges and then submit a readVectored
+   * operation and then uses a separate thread pool to process the
+   * results asynchronously.
+   */
+  @Test
+  public void testVectoredIOEndToEnd() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+    ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+    CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+      // user can perform other computations while waiting for IO.
+      for (FileRange res : fileRanges) {
+        dataProcessor.submit(() -> {
+          try {
+            readBufferValidateDataAndReturnToPool(pool, res, countDown);
+          } catch (Exception e) {
+            LOG.error("Error while process result for {} ", res, e);

Review Comment:
   should be saved to a field/variable, with junit thread rethrowing if the value is non null



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
     }
   }
 
+  /**
+   * This test creates list of ranges and then submit a readVectored
+   * operation and then uses a separate thread pool to process the
+   * results asynchronously.
+   */
+  @Test
+  public void testVectoredIOEndToEnd() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+    ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+    CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+      // user can perform other computations while waiting for IO.
+      for (FileRange res : fileRanges) {
+        dataProcessor.submit(() -> {
+          try {
+            readBufferValidateDataAndReturnToPool(pool, res, countDown);
+          } catch (Exception e) {
+            LOG.error("Error while process result for {} ", res, e);
+          }
+        });
+      }
+      if (!countDown.await(100, TimeUnit.SECONDS)) {
+        throw new AssertionError("Error while processing vectored io results");
+      }
+    } finally {
+      pool.release();
+      HadoopExecutors.shutdown(dataProcessor, LOG, 100, TimeUnit.SECONDS);
+    }
+  }
+
+  private void readBufferValidateDataAndReturnToPool(ByteBufferPool pool,
+                                                     FileRange res,
+                                                     CountDownLatch countDownLatch)
+          throws IOException, TimeoutException {
+    CompletableFuture<ByteBuffer> data = res.getData();
+    ByteBuffer buffer = FutureIO.awaitFuture(data,

Review Comment:
   I think we all -you, me, everyone else- needs to spend some time working with CompletableFuture and chaining them.
   
   In this code 
   ```
   data.thenAccept(buffer -> {
   	// all the validation
   });
   ```
   
   and await() for that.
   
   It's a mess because java's checked exceptions cripple their lambda-expression methods when IO operations are invoked. But if we trying to live in their world at least we will get more insight into how we could actually improve our own code to work better there. Though it may of course be too late by now.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
     }
   }
 
+  /**
+   * This test creates list of ranges and then submit a readVectored
+   * operation and then uses a separate thread pool to process the
+   * results asynchronously.
+   */
+  @Test
+  public void testVectoredIOEndToEnd() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+    ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+    CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+      // user can perform other computations while waiting for IO.
+      for (FileRange res : fileRanges) {
+        dataProcessor.submit(() -> {
+          try {
+            readBufferValidateDataAndReturnToPool(pool, res, countDown);
+          } catch (Exception e) {
+            LOG.error("Error while process result for {} ", res, e);
+          }
+        });
+      }
+      if (!countDown.await(100, TimeUnit.SECONDS)) {
+        throw new AssertionError("Error while processing vectored io results");
+      }
+    } finally {
+      pool.release();

Review Comment:
   how about adding an assert on L408 that the pool has its buffers returned?



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
     }
   }
 
+  /**
+   * This test creates list of ranges and then submit a readVectored
+   * operation and then uses a separate thread pool to process the
+   * results asynchronously.
+   */
+  @Test
+  public void testVectoredIOEndToEnd() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+    ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+    CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+      // user can perform other computations while waiting for IO.
+      for (FileRange res : fileRanges) {
+        dataProcessor.submit(() -> {
+          try {
+            readBufferValidateDataAndReturnToPool(pool, res, countDown);
+          } catch (Exception e) {
+            LOG.error("Error while process result for {} ", res, e);
+          }
+        });
+      }
+      if (!countDown.await(100, TimeUnit.SECONDS)) {
+        throw new AssertionError("Error while processing vectored io results");

Review Comment:
   declare timeout



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java:
##########
@@ -364,6 +373,63 @@ public void testMultipleVectoredReads() throws Exception {
     }
   }
 
+  /**
+   * This test creates list of ranges and then submit a readVectored
+   * operation and then uses a separate thread pool to process the
+   * results asynchronously.
+   */
+  @Test
+  public void testVectoredIOEndToEnd() throws Exception {
+    FileSystem fs = getFileSystem();
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(8 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(14 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(10 * 1024, 100));
+    fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100));
+    fileRanges.add(FileRange.createFileRange(40 * 1024, 1024));
+
+    ExecutorService dataProcessor = Executors.newFixedThreadPool(5);
+    CountDownLatch countDown = new CountDownLatch(fileRanges.size());
+
+    try (FSDataInputStream in = fs.open(path(VECTORED_READ_FILE_NAME))) {
+      in.readVectored(fileRanges, value -> pool.getBuffer(true, value));
+      // user can perform other computations while waiting for IO.
+      for (FileRange res : fileRanges) {
+        dataProcessor.submit(() -> {
+          try {
+            readBufferValidateDataAndReturnToPool(pool, res, countDown);
+          } catch (Exception e) {
+            LOG.error("Error while process result for {} ", res, e);
+          }
+        });
+      }
+      if (!countDown.await(100, TimeUnit.SECONDS)) {
+        throw new AssertionError("Error while processing vectored io results");
+      }
+    } finally {
+      pool.release();
+      HadoopExecutors.shutdown(dataProcessor, LOG, 100, TimeUnit.SECONDS);

Review Comment:
   use same constant as proposed for L100





> Add an integration test to process data asynchronously during vectored read.
> ----------------------------------------------------------------------------
>
>                 Key: HADOOP-18463
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18463
>             Project: Hadoop Common
>          Issue Type: Sub-task
>            Reporter: Mukund Thakur
>            Assignee: Mukund Thakur
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org