You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/11/01 11:14:14 UTC

[hadoop] branch branch-3.3.5 updated: HADOOP-18507. VectorIO FileRange type to support a "reference" field (#5076)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3.5
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3.5 by this push:
     new 9bf9560afe0 HADOOP-18507. VectorIO FileRange type to support a "reference" field (#5076)
9bf9560afe0 is described below

commit 9bf9560afe00ebb75bfecca9d288134607f79c34
Author: Steve Loughran <st...@cloudera.com>
AuthorDate: Mon Oct 31 21:12:13 2022 +0000

    HADOOP-18507. VectorIO FileRange type to support a "reference" field (#5076)
    
    Contributed by Steve Loughran
---
 .../main/java/org/apache/hadoop/fs/FileRange.java  |  22 +++-
 .../apache/hadoop/fs/impl/CombinedFileRange.java   |   4 +-
 .../org/apache/hadoop/fs/impl/FileRangeImpl.java   |  19 ++-
 .../apache/hadoop/fs/TestVectoredReadUtils.java    | 134 +++++++++++++++------
 .../hadoop/benchmark/VectoredReadBenchmark.java    |   2 +-
 5 files changed, 141 insertions(+), 40 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
index e55696e9650..97da65585d6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java
@@ -55,6 +55,15 @@ public interface FileRange {
    */
   void setData(CompletableFuture<ByteBuffer> data);
 
+  /**
+   * Get any reference passed in to the file range constructor.
+   * This is not used by any implementation code; it is to help
+   * bind this API to libraries retrieving multiple stripes of
+   * data in parallel.
+   * @return a reference or null.
+   */
+  Object getReference();
+
   /**
    * Factory method to create a FileRange object.
    * @param offset starting offset of the range.
@@ -62,6 +71,17 @@ public interface FileRange {
    * @return a new instance of FileRangeImpl.
    */
   static FileRange createFileRange(long offset, int length) {
-    return new FileRangeImpl(offset, length);
+    return new FileRangeImpl(offset, length, null);
+  }
+
+  /**
+   * Factory method to create a FileRange object.
+   * @param offset starting offset of the range.
+   * @param length length of the range.
+   * @param reference nullable reference to store in the range.
+   * @return a new instance of FileRangeImpl.
+   */
+  static FileRange createFileRange(long offset, int length, Object reference) {
+    return new FileRangeImpl(offset, length, reference);
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
index 516bbb2c70c..c9555a1e541 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
@@ -29,10 +29,10 @@ import java.util.List;
  * together into a single read for efficiency.
  */
 public class CombinedFileRange extends FileRangeImpl {
-  private ArrayList<FileRange> underlying = new ArrayList<>();
+  private List<FileRange> underlying = new ArrayList<>();
 
   public CombinedFileRange(long offset, long end, FileRange original) {
-    super(offset, (int) (end - offset));
+    super(offset, (int) (end - offset), null);
     this.underlying.add(original);
   }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java
index 041e5f0a8d2..1239be764ba 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java
@@ -34,9 +34,21 @@ public class FileRangeImpl implements FileRange {
   private int length;
   private CompletableFuture<ByteBuffer> reader;
 
-  public FileRangeImpl(long offset, int length) {
+  /**
+   * nullable reference to store in the range.
+   */
+  private final Object reference;
+
+  /**
+   * Create.
+   * @param offset offset in file
+   * @param length length of data to read.
+   * @param reference nullable reference to store in the range.
+   */
+  public FileRangeImpl(long offset, int length, Object reference) {
     this.offset = offset;
     this.length = length;
+    this.reference = reference;
   }
 
   @Override
@@ -71,4 +83,9 @@ public class FileRangeImpl implements FileRange {
   public CompletableFuture<ByteBuffer> getData() {
     return reader;
   }
+
+  @Override
+  public Object getReference() {
+    return reference;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java
index ebf0e14053b..fdfa8f6eb6f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java
@@ -96,7 +96,10 @@ public class TestVectoredReadUtils extends HadoopTestBase {
 
   @Test
   public void testMerge() {
-    FileRange base = FileRange.createFileRange(2000, 1000);
+    // a reference to use for tracking
+    Object tracker1 = "one";
+    Object tracker2 = "two";
+    FileRange base = FileRange.createFileRange(2000, 1000, tracker1);
     CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
 
     // test when the gap between is too big
@@ -104,44 +107,48 @@ public class TestVectoredReadUtils extends HadoopTestBase {
         FileRange.createFileRange(5000, 1000), 2000, 4000));
     assertEquals("Number of ranges in merged range shouldn't increase",
             1, mergeBase.getUnderlying().size());
-    assertEquals("post merge offset", 2000, mergeBase.getOffset());
-    assertEquals("post merge length", 1000, mergeBase.getLength());
+    assertFileRange(mergeBase, 2000, 1000);
 
     // test when the total size gets exceeded
     assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
         FileRange.createFileRange(5000, 1000), 2001, 3999));
     assertEquals("Number of ranges in merged range shouldn't increase",
             1, mergeBase.getUnderlying().size());
-    assertEquals("post merge offset", 2000, mergeBase.getOffset());
-    assertEquals("post merge length", 1000, mergeBase.getLength());
+    assertFileRange(mergeBase, 2000, 1000);
 
     // test when the merge works
     assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
-        FileRange.createFileRange(5000, 1000), 2001, 4000));
+        FileRange.createFileRange(5000, 1000, tracker2),
+        2001, 4000));
     assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
-    assertEquals("post merge offset", 2000, mergeBase.getOffset());
-    assertEquals("post merge length", 4000, mergeBase.getLength());
+    assertFileRange(mergeBase, 2000, 4000);
+
+    Assertions.assertThat(mergeBase.getUnderlying().get(0).getReference())
+        .describedAs("reference of range %s", mergeBase.getUnderlying().get(0))
+        .isSameAs(tracker1);
+    Assertions.assertThat(mergeBase.getUnderlying().get(1).getReference())
+        .describedAs("reference of range %s", mergeBase.getUnderlying().get(1))
+        .isSameAs(tracker2);
 
     // reset the mergeBase and test with a 10:1 reduction
     mergeBase = new CombinedFileRange(200, 300, base);
-    assertEquals(200, mergeBase.getOffset());
-    assertEquals(100, mergeBase.getLength());
+    assertFileRange(mergeBase, 200, 100);
+
     assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
         FileRange.createFileRange(5000, 1000), 201, 400));
     assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
-    assertEquals("post merge offset", 200, mergeBase.getOffset());
-    assertEquals("post merge length", 400, mergeBase.getLength());
+    assertFileRange(mergeBase, 200, 400);
   }
 
   @Test
   public void testSortAndMerge() {
     List<FileRange> input = Arrays.asList(
-        FileRange.createFileRange(3000, 100),
-        FileRange.createFileRange(2100, 100),
-        FileRange.createFileRange(1000, 100)
+        FileRange.createFileRange(3000, 100, "1"),
+        FileRange.createFileRange(2100, 100, null),
+        FileRange.createFileRange(1000, 100, "3")
         );
     assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
-    List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
+    final List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
             Arrays.asList(sortRanges(input)), 100, 1001, 2500);
     Assertions.assertThat(outputList)
             .describedAs("merged range size")
@@ -150,51 +157,105 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     Assertions.assertThat(output.getUnderlying())
             .describedAs("merged range underlying size")
             .hasSize(3);
-    assertEquals("range[1000,3100)", output.toString());
+    // range[1000,3100)
+    assertFileRange(output, 1000, 2100);
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
 
     // the minSeek doesn't allow the first two to merge
     assertFalse("Ranges are non disjoint",
             VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
-    outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
+    final List<CombinedFileRange> list2 = VectoredReadUtils.mergeSortedRanges(
+        Arrays.asList(sortRanges(input)),
             100, 1000, 2100);
-    Assertions.assertThat(outputList)
+    Assertions.assertThat(list2)
             .describedAs("merged range size")
             .hasSize(2);
-    assertEquals("range[1000,1100)", outputList.get(0).toString());
-    assertEquals("range[2100,3100)", outputList.get(1).toString());
+    assertFileRange(list2.get(0), 1000, 100);
+
+    // range[2100,3100)
+    assertFileRange(list2.get(1), 2100, 1000);
+
     assertTrue("merged output ranges are disjoint",
-            VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000));
+            VectoredReadUtils.isOrderedDisjoint(list2, 100, 1000));
 
     // the maxSize doesn't allow the third range to merge
     assertFalse("Ranges are non disjoint",
             VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
-    outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
+    final List<CombinedFileRange> list3 = VectoredReadUtils.mergeSortedRanges(
+        Arrays.asList(sortRanges(input)),
             100, 1001, 2099);
-    Assertions.assertThat(outputList)
+    Assertions.assertThat(list3)
             .describedAs("merged range size")
             .hasSize(2);
-    assertEquals("range[1000,2200)", outputList.get(0).toString());
-    assertEquals("range[3000,3100)", outputList.get(1).toString());
+    // range[1000,2200)
+    CombinedFileRange range0 = list3.get(0);
+    assertFileRange(range0, 1000, 1200);
+    assertFileRange(range0.getUnderlying().get(0),
+        1000, 100, "3");
+    assertFileRange(range0.getUnderlying().get(1),
+        2100, 100, null);
+    CombinedFileRange range1 = list3.get(1);
+    // range[3000,3100)
+    assertFileRange(range1, 3000, 100);
+    assertFileRange(range1.getUnderlying().get(0),
+        3000, 100, "1");
+
     assertTrue("merged output ranges are disjoint",
-            VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
+            VectoredReadUtils.isOrderedDisjoint(list3, 100, 800));
 
     // test the round up and round down (the maxSize doesn't allow any merges)
     assertFalse("Ranges are non disjoint",
             VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
-    outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
+    final List<CombinedFileRange> list4 = VectoredReadUtils.mergeSortedRanges(
+        Arrays.asList(sortRanges(input)),
             16, 1001, 100);
-    Assertions.assertThat(outputList)
+    Assertions.assertThat(list4)
             .describedAs("merged range size")
             .hasSize(3);
-    assertEquals("range[992,1104)", outputList.get(0).toString());
-    assertEquals("range[2096,2208)", outputList.get(1).toString());
-    assertEquals("range[2992,3104)", outputList.get(2).toString());
+    // range[992,1104)
+    assertFileRange(list4.get(0), 992, 112);
+    // range[2096,2208)
+    assertFileRange(list4.get(1), 2096, 112);
+    // range[2992,3104)
+    assertFileRange(list4.get(2), 2992, 112);
     assertTrue("merged output ranges are disjoint",
-            VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700));
+            VectoredReadUtils.isOrderedDisjoint(list4, 16, 700));
+  }
+
+  /**
+   * Assert that a file range satisfies the conditions.
+   * @param range range to validate
+   * @param offset offset of range
+   * @param length range length
+   */
+  private void assertFileRange(FileRange range, long offset, int length) {
+    Assertions.assertThat(range)
+        .describedAs("file range %s", range)
+        .isNotNull();
+    Assertions.assertThat(range.getOffset())
+        .describedAs("offset of %s", range)
+        .isEqualTo(offset);
+    Assertions.assertThat(range.getLength())
+        .describedAs("length of %s", range)
+        .isEqualTo(length);
+  }
+
+  /**
+   * Assert that a file range satisfies the conditions.
+   * @param range range to validate
+   * @param offset offset of range
+   * @param length range length
+   * @param reference reference; may be null.
+   */
+  private void assertFileRange(FileRange range, long offset, int length, Object reference) {
+    assertFileRange(range, offset, length);
+    Assertions.assertThat(range.getReference())
+        .describedAs("reference field of file range %s", range)
+        .isEqualTo(reference);
   }
 
+
   @Test
   public void testSortAndMergeMoreCases() throws Exception {
     List<FileRange> input = Arrays.asList(
@@ -214,7 +275,9 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     Assertions.assertThat(output.getUnderlying())
             .describedAs("merged range underlying size")
             .hasSize(4);
-    assertEquals("range[1000,3110)", output.toString());
+
+    assertFileRange(output, 1000, 2110);
+
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
 
@@ -227,7 +290,8 @@ public class TestVectoredReadUtils extends HadoopTestBase {
     Assertions.assertThat(output.getUnderlying())
             .describedAs("merged range underlying size")
             .hasSize(4);
-    assertEquals("range[1000,3200)", output.toString());
+    assertFileRange(output, 1000, 2200);
+
     assertTrue("merged output ranges are disjoint",
             VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
 
diff --git a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
index 631842f78e2..5df46c36786 100644
--- a/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
+++ b/hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java
@@ -169,7 +169,7 @@ public class VectoredReadBenchmark {
 
     FileRangeCallback(AsynchronousFileChannel channel, long offset,
                       int length, Joiner joiner, ByteBuffer buffer) {
-      super(offset, length);
+      super(offset, length, null);
       this.channel = channel;
       this.joiner = joiner;
       this.buffer = buffer;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org