You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "singhpk234 (via GitHub)" <gi...@apache.org> on 2023/04/04 19:46:55 UTC

[GitHub] [iceberg] singhpk234 opened a new pull request, #7279: [Parquet] Eagerly fetch row groups when reading parquet

singhpk234 opened a new pull request, #7279:
URL: https://github.com/apache/iceberg/pull/7279

   ### About the changes
   
   This changes attempts to pre-fetch row groups while reading parquet files. 
   This is first part of the changes proposed in https://github.com/apache/iceberg/issues/647
   
   cc @jackye1995 @rdblue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#issuecomment-1501597160

   Should we use some parameters to enable this or not? Because prefetch row groups meaning need more memory .@singhpk234 is there any benchmark result to show?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#discussion_r1161899644


##########
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java:
##########
@@ -109,11 +117,23 @@ public CloseableIterator<T> iterator() {
     private final int batchSize;
     private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
     private final boolean reuseContainers;
-    private int nextRowGroup = 0;
     private long nextRowGroupStart = 0;
     private long valuesRead = 0;
     private T last = null;
     private final long[] rowGroupsStartRowPos;
+    private final int totalRowGroups;
+    private static final ExecutorService prefetchService =
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor)
+                Executors.newFixedThreadPool(
+                    4,

Review Comment:
   ACK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#issuecomment-1496519281

   one more idea on this context and at a higher level, is that can we prefetch next task and keep it in memory and expose an iterator around it, when we are reading a task groups to be specific here 
   
   https://github.com/apache/iceberg/blob/49e930877a16bce2df51d6e51b737d2969208644/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java#L132-L157
   
   
   
   so that we don't have to wait to read next task and directly ask iterator on it. 
   
   cc @jackye1995 @rdblue  please let me know you thoughts on this as well. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] sririshindra commented on a diff in pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "sririshindra (via GitHub)" <gi...@apache.org>.
sririshindra commented on code in PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#discussion_r1157857579


##########
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java:
##########
@@ -154,21 +177,47 @@ public T next() {
     }
 
     private void advance() {
-      while (shouldSkip[nextRowGroup]) {
-        nextRowGroup += 1;
-        reader.skipNextRowGroup();
-      }
-      PageReadStore pages;
       try {
-        pages = reader.readNextRowGroup();
-      } catch (IOException e) {
-        throw new RuntimeIOException(e);
+        Preconditions.checkNotNull(prefetchRowGroupFuture, "future should not be null");
+        PageReadStore pages = prefetchRowGroupFuture.get();
+
+        if (prefetchedRowGroup >= totalRowGroups) {
+          return;
+        }
+        Preconditions.checkState(
+            pages != null,
+            "advance() should have been only when there was at least one row group to read");
+        long rowPosition = rowGroupsStartRowPos[prefetchedRowGroup];
+        model.setRowGroupInfo(pages, columnChunkMetadata.get(prefetchedRowGroup), rowPosition);
+        nextRowGroupStart += pages.getRowCount();
+        prefetchedRowGroup += 1;
+        prefetchNextRowGroup(); // eagerly fetch the next row group

Review Comment:
   Why stop with just the next row group? Can we instead maintain a RowGroup queue of RowGroup futures, where we keep populating the queue with the next RowGroup continuously and asynchronously till we exhaust all available RowGroups in the file? 
   We can deque the RowGroup queue as needed. This way, we may reduce the likelihood of the `prefetchRowGroupFuture.get()` call ever having to wait for the future computation to finish.. What do you think?



##########
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java:
##########
@@ -109,11 +117,23 @@ public CloseableIterator<T> iterator() {
     private final int batchSize;
     private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
     private final boolean reuseContainers;
-    private int nextRowGroup = 0;
     private long nextRowGroupStart = 0;
     private long valuesRead = 0;
     private T last = null;
     private final long[] rowGroupsStartRowPos;
+    private final int totalRowGroups;
+    private static final ExecutorService prefetchService =
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor)
+                Executors.newFixedThreadPool(
+                    4,

Review Comment:
   Just curious as why you chose the number of threads in the pool to to be '4'. In this current implementation you are prefetching only one row group at a time. Correct? shouldn't one thread in the thread pool suffice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#issuecomment-1499420946

   cc @aokolnychyi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#discussion_r1164529427


##########
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java:
##########
@@ -154,21 +177,47 @@ public T next() {
     }
 
     private void advance() {
-      while (shouldSkip[nextRowGroup]) {
-        nextRowGroup += 1;
-        reader.skipNextRowGroup();
-      }
-      PageReadStore pages;
       try {
-        pages = reader.readNextRowGroup();
-      } catch (IOException e) {
-        throw new RuntimeIOException(e);
+        Preconditions.checkNotNull(prefetchRowGroupFuture, "future should not be null");
+        PageReadStore pages = prefetchRowGroupFuture.get();
+
+        if (prefetchedRowGroup >= totalRowGroups) {
+          return;
+        }
+        Preconditions.checkState(
+            pages != null,
+            "advance() should have been only when there was at least one row group to read");
+        long rowPosition = rowGroupsStartRowPos[prefetchedRowGroup];
+        model.setRowGroupInfo(pages, columnChunkMetadata.get(prefetchedRowGroup), rowPosition);
+        nextRowGroupStart += pages.getRowCount();
+        prefetchedRowGroup += 1;
+        prefetchNextRowGroup(); // eagerly fetch the next row group

Review Comment:
   Was testing this with concurrent reads, looks like reader.readRowGroup(int blockIndex) is not thread safe, it requires the SeekableInputStream that parquet reader is holding to be seeked to the given offset first, since this is a class variable it was causing correctness issue. 
   One possible solution is to have a pool of ParquetFileReaders and get one reader from the pool, ask it to get the desired row-group and then when reading all the row-groups is done close the pool. Thinking of prototyping this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ConeyLiu commented on a diff in pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "ConeyLiu (via GitHub)" <gi...@apache.org>.
ConeyLiu commented on code in PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#discussion_r1161572794


##########
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java:
##########
@@ -109,11 +117,23 @@ public CloseableIterator<T> iterator() {
     private final int batchSize;
     private final List<Map<ColumnPath, ColumnChunkMetaData>> columnChunkMetadata;
     private final boolean reuseContainers;
-    private int nextRowGroup = 0;
     private long nextRowGroupStart = 0;
     private long valuesRead = 0;
     private T last = null;
     private final long[] rowGroupsStartRowPos;
+    private final int totalRowGroups;
+    private static final ExecutorService prefetchService =
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor)
+                Executors.newFixedThreadPool(
+                    4,

Review Comment:
   The `ThreadPool` should be shared by all the readers in the process. But the hard code may need to improve.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#discussion_r1161764815


##########
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java:
##########
@@ -154,21 +177,47 @@ public T next() {
     }
 
     private void advance() {
-      while (shouldSkip[nextRowGroup]) {
-        nextRowGroup += 1;
-        reader.skipNextRowGroup();
-      }
-      PageReadStore pages;
       try {
-        pages = reader.readNextRowGroup();
-      } catch (IOException e) {
-        throw new RuntimeIOException(e);
+        Preconditions.checkNotNull(prefetchRowGroupFuture, "future should not be null");
+        PageReadStore pages = prefetchRowGroupFuture.get();
+
+        if (prefetchedRowGroup >= totalRowGroups) {
+          return;
+        }
+        Preconditions.checkState(
+            pages != null,
+            "advance() should have been only when there was at least one row group to read");
+        long rowPosition = rowGroupsStartRowPos[prefetchedRowGroup];
+        model.setRowGroupInfo(pages, columnChunkMetadata.get(prefetchedRowGroup), rowPosition);
+        nextRowGroupStart += pages.getRowCount();
+        prefetchedRowGroup += 1;
+        prefetchNextRowGroup(); // eagerly fetch the next row group

Review Comment:
   I'm in favor of having the "eagerness" of the prefetch be configurable so long as we ensure a sane default. We can measure what an appropriate value should be for this but it should balance between memory consumption and performance. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#issuecomment-1507322338

   > @singhpk234 is there any benchmark result to show?
   
   @ConeyLiu working on the benchmarks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] singhpk234 commented on a diff in pull request #7279: [Parquet] Eagerly fetch row groups when reading parquet

Posted by "singhpk234 (via GitHub)" <gi...@apache.org>.
singhpk234 commented on code in PR #7279:
URL: https://github.com/apache/iceberg/pull/7279#discussion_r1160094891


##########
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java:
##########
@@ -154,21 +177,47 @@ public T next() {
     }
 
     private void advance() {
-      while (shouldSkip[nextRowGroup]) {
-        nextRowGroup += 1;
-        reader.skipNextRowGroup();
-      }
-      PageReadStore pages;
       try {
-        pages = reader.readNextRowGroup();
-      } catch (IOException e) {
-        throw new RuntimeIOException(e);
+        Preconditions.checkNotNull(prefetchRowGroupFuture, "future should not be null");
+        PageReadStore pages = prefetchRowGroupFuture.get();
+
+        if (prefetchedRowGroup >= totalRowGroups) {
+          return;
+        }
+        Preconditions.checkState(
+            pages != null,
+            "advance() should have been only when there was at least one row group to read");
+        long rowPosition = rowGroupsStartRowPos[prefetchedRowGroup];
+        model.setRowGroupInfo(pages, columnChunkMetadata.get(prefetchedRowGroup), rowPosition);
+        nextRowGroupStart += pages.getRowCount();
+        prefetchedRowGroup += 1;
+        prefetchNextRowGroup(); // eagerly fetch the next row group

Review Comment:
   Agree with you, we can certainly do that, at the moment just kept it simple and closer to the existing poc code. Let me add this in later revisions if folks are onboard.
   
   Though we might not wanna load all the row-groups at one shot, as it might cause some memory pressure, may be having a conf for controlling the same would be helpful.
   
   P.S. Will also add pre-fetch of parquet data pages as well shortly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org