You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2023/03/07 00:24:21 UTC

[parquet-mr] branch master updated: PARQUET-2252: Make row range methods public (#1038)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new cc145b3f4 PARQUET-2252: Make row range methods public (#1038)
cc145b3f4 is described below

commit cc145b3f4d82c7d4c67d45907163990c4bf084d6
Author: Yujiang Zhong <42...@users.noreply.github.com>
AuthorDate: Tue Mar 7 08:24:12 2023 +0800

    PARQUET-2252: Make row range methods public (#1038)
---
 .../internal/column/columnindex/IndexIterator.java |  6 ++--
 .../internal/filter2/columnindex/RowRanges.java    | 31 +++++++++++++--------
 .../apache/parquet/hadoop/ParquetFileReader.java   | 32 +++++++++++++++++++++-
 3 files changed, 53 insertions(+), 16 deletions(-)

diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
index e1ae75033..29235688a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/IndexIterator.java
@@ -29,14 +29,14 @@ import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder.ColumnI
 /**
  * Iterator implementation for page indexes.
  */
-class IndexIterator implements PrimitiveIterator.OfInt {
+public class IndexIterator implements PrimitiveIterator.OfInt {
   public static final PrimitiveIterator.OfInt EMPTY = IntStream.empty().iterator();
   private int index;
   private final int endIndex;
   private final IntPredicate filter;
   private final IntUnaryOperator translator;
 
-  static PrimitiveIterator.OfInt all(int pageCount) {
+  public static PrimitiveIterator.OfInt all(int pageCount) {
     return new IndexIterator(0, pageCount, i -> true, i -> i);
   }
 
@@ -44,7 +44,7 @@ class IndexIterator implements PrimitiveIterator.OfInt {
     return new IndexIterator(0, comparator.arrayLength(), i -> true, comparator::translate);
   }
 
-  static PrimitiveIterator.OfInt filter(int pageCount, IntPredicate filter) {
+  public static PrimitiveIterator.OfInt filter(int pageCount, IntPredicate filter) {
     return new IndexIterator(0, pageCount, filter, i -> i);
   }
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
index 743f42de6..aabada67f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
+++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java
@@ -92,7 +92,7 @@ public class RowRanges {
     }
   }
 
-  static final RowRanges EMPTY = new RowRanges(Collections.emptyList());
+  public static final RowRanges EMPTY = new RowRanges(Collections.emptyList());
 
   private final List<Range> ranges;
 
@@ -115,7 +115,7 @@ public class RowRanges {
    * @param rowCount a single row count
    * @return an immutable RowRanges
    */
-  static RowRanges createSingle(long rowCount) {
+  public static RowRanges createSingle(long rowCount) {
     return new RowRanges(new Range(0L, rowCount - 1L));
   }
 
@@ -137,7 +137,7 @@ public class RowRanges {
    * @param offsetIndex offsetIndex
    * @return a mutable RowRanges
    */
-  static RowRanges create(long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex) {
+  public static RowRanges create(long rowCount, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex) {
     RowRanges ranges = new RowRanges();
     while (pageIndexes.hasNext()) {
       int pageIndex = pageIndexes.nextInt();
@@ -146,18 +146,22 @@ public class RowRanges {
     return ranges;
   }
 
-  /*
+  /**
    * Calculates the union of the two specified RowRanges object. The union of two range is calculated if there are no
    * elements between them. Otherwise, the two disjunct ranges are stored separately.
+   * <pre>
    * For example:
-   * [113, 241] ∪ [221, 340] = [113, 330]
+   * [113, 241] ∪ [221, 340] = [113, 340]
    * [113, 230] ∪ [231, 340] = [113, 340]
    * while
    * [113, 230] ∪ [232, 340] = [113, 230], [232, 340]
-   *
+   * </pre>
    * The result RowRanges object will contain all the row indexes that were contained in one of the specified objects.
+   * @param left left RowRanges
+   * @param right right RowRanges
+   * @return a mutable RowRanges contains all the row indexes that were contained in one of the specified objects
    */
-  static RowRanges union(RowRanges left, RowRanges right) {
+  public static RowRanges union(RowRanges left, RowRanges right) {
     RowRanges result = new RowRanges();
     Iterator<Range> it1 = left.ranges.iterator();
     Iterator<Range> it2 = right.ranges.iterator();
@@ -186,17 +190,20 @@ public class RowRanges {
     return result;
   }
 
-  /*
+  /**
    * Calculates the intersection of the two specified RowRanges object. Two ranges intersect if they have common
    * elements otherwise the result is empty.
+   * <pre>
    * For example:
    * [113, 241] ∩ [221, 340] = [221, 241]
    * while
-   * [113, 230] ∩ [231, 340] = <EMPTY>
-   *
-   * The result RowRanges object will contain all the row indexes there were contained in both of the specified objects
+   * [113, 230] ∩ [231, 340] = &lt;EMPTY&gt;
+   * </pre>
+   * @param left left RowRanges
+   * @param right right RowRanges
+   * @return a mutable RowRanges contains all the row indexes that were contained in both of the specified objects
    */
-  static RowRanges intersection(RowRanges left, RowRanges right) {
+  public static RowRanges intersection(RowRanges left, RowRanges right) {
     RowRanges result = new RowRanges();
 
     int rightIndex = 0;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 8f5117502..7fa71cb61 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -46,6 +46,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -1011,6 +1012,35 @@ public class ParquetFileReader implements Closeable {
     }
 
     RowRanges rowRanges = getRowRanges(blockIndex);
+    return readFilteredRowGroup(blockIndex, rowRanges);
+  }
+
+  /**
+   * Reads all the columns requested from the specified row group. It may skip specific pages based on the
+   * {@code rowRanges} passed in. As the rows are not aligned among the pages of the different columns row
+   * synchronization might be required. See the documentation of the class SynchronizingColumnReader for details.
+   *
+   * @param blockIndex the index of the requested block
+   * @param rowRanges the row ranges to be read from the requested block
+   * @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block
+   * @throws IOException if an error occurs while reading
+   * @throws IllegalArgumentException if the {@code blockIndex} is invalid or the {@code rowRanges} is null
+   */
+  public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges rowRanges) throws IOException {
+    if (blockIndex < 0 || blockIndex >= blocks.size()) {
+      throw new IllegalArgumentException(String.format("Invalid block index %s, the valid block index range are: " +
+        "[%s, %s]", blockIndex, 0, blocks.size() - 1));
+    }
+
+    if (Objects.isNull(rowRanges)) {
+      throw new IllegalArgumentException("RowRanges must not be null");
+    }
+
+    BlockMetaData block = blocks.get(blockIndex);
+    if (block.getRowCount() == 0L) {
+      return null;
+    }
+
     long rowCount = rowRanges.rowCount();
     if (rowCount == 0) {
       // There are no matching rows -> returning null
@@ -1130,7 +1160,7 @@ public class ParquetFileReader implements Closeable {
     }
   }
 
-  private ColumnIndexStore getColumnIndexStore(int blockIndex) {
+  public ColumnIndexStore getColumnIndexStore(int blockIndex) {
     ColumnIndexStore ciStore = blockIndexStores.get(blockIndex);
     if (ciStore == null) {
       ciStore = ColumnIndexStoreImpl.create(this, blocks.get(blockIndex), paths.keySet());