You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "rdblue (via GitHub)" <gi...@apache.org> on 2023/02/25 00:54:40 UTC

[GitHub] [iceberg] rdblue opened a new pull request, #6935: Parquet: Add page filter using page indexes

rdblue opened a new pull request, #6935:
URL: https://github.com/apache/iceberg/pull/6935

   This adds an evaluator, `ParquetIndexPageFilter.EvalVisitor` to evaluate an Iceberg Expression against Parquet's page indexes. That produces Parquet's `RowRanges`, which track ranges of rows that should be read from the Parquet file. The filter class, `ParquetIndexPageFilter` also sets the row ranges on the `ParquetFileReader`.
   
   Parquet doesn't expose some of the `RowRanges` methods publicly, so this has a `ParquetRanges` class that uses reflection to construct the ranges and to use them to filter `ParquetFileReader`.
   
   Also, **this does not update record materialization**. Record materialization will need to coordinate the values read across pages. Currently, it assumes that the pages returned are in sync, but that will change when Parquet filters pages.
   
   For example, if column A has just one page, `[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]` and column B has two pages, `[a, b, c, d, e]` and `[f, g, h, i, j, k]`, and the row range is `6-8` then the results should be `(6, g)`, `(7, h)`, and `(8, i)`. The reader for column A will get the entire page and need to skip 6 values (0, 1, 2, 3, 4, and 5), then the reader for column B will get just the second page and need to skip 1 value (f) to get in sync. The readers will need to handle this.
   
   I did this work a while ago and after looking at it with fresh eyes today, I have a couple concerns:
   1. I didn't get to writing tests for the `RowRanges` that are produced. This should be done next.
   2. The filter is applied at the row group level, but it looks like that's not how `RowRanges` works in Parquet, so this probably needs to be refactored to run the filter, produce `RowRanges`, and set them on the `ParquetFileReader` just once. In addition, if any row groups are completely eliminated by the filter, this will need to account for the missing row groups in the Iceberg Parquet reader.
   3. Since the filter is applied at the row group level, `allRows` may be incorrect (it is the row group's row count) and need to be adjusted to be the entire file rather than specific to a row group.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #6935: Parquet: Add page filter using page indexes

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on PR #6935:
URL: https://github.com/apache/iceberg/pull/6935#issuecomment-1445436703

   > The readFilteredRowGroup method provided by Parquet will detect whether there is a filter pushed down,
   and only return the filtered row-group when there is a push-down filter.
   
   I commented where we set the row ranges for the row group. I think that should work with Parquet, but it's been a while since I looked at it. Getting a public API call in would make it easier.
   
   > I think RowRanges is also at the row [group level](https://github.com/apache/parquet-mr/blob/c9cfe821448a2f99797fda7f46c70a16cc1250a9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java#L33), Parquet-mr will [generate](https://github.com/apache/parquet-mr/blob/c9cfe821448a2f99797fda7f46c70a16cc1250a9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1142%EF%BC%89) a RowRanges for each row-group when running the column index filter.
   
   Great! I put this together by trying to reverse engineer what was going on in Parquet, so I must have gotten it right.
   
   > We will have to wait for the next version to use, but Parquet-mr may have a release in a month, see this comment , we should be able to catch up with this release. If you agree, I can open a PR in the Parquet-mr repo.
   
   I'm all for adding what we need to Parquet. We can continue to use reflection until it is available.
   
   I think the main thing is that we don't currently handle the row ranges after we've skipped reading the pages. So the next steps are to verify what's in this PR and then to update the read paths so that values are skipped for the skipped rows.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #6935: Parquet: Add page filter using page indexes

Posted by "rdblue (via GitHub)" <gi...@apache.org>.
rdblue commented on code in PR #6935:
URL: https://github.com/apache/iceberg/pull/6935#discussion_r1118134733


##########
parquet/src/main/java/org/apache/iceberg/parquet/ParquetIndexPageFilter.java:
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.parquet;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.expressions.Binder;
+import org.apache.iceberg.expressions.BoundReference;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.ExpressionVisitors;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.util.ByteBuffers;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.internal.column.columnindex.ColumnIndex;
+import org.apache.parquet.internal.column.columnindex.OffsetIndex;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges;
+import org.apache.parquet.internal.filter2.columnindex.RowRanges.Range;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ParquetIndexPageFilter {
+  private static final Logger LOG = LoggerFactory.getLogger(ParquetIndexPageFilter.class);
+
+  private final Schema schema;
+  private final MessageType fileSchema;
+  private final Expression expr;
+
+  public ParquetIndexPageFilter(Schema schema, MessageType fileSchema, Expression expr, boolean caseSensitive) {
+    this.schema = schema;
+    this.fileSchema = fileSchema;
+    this.expr = Binder.bind(schema.asStruct(), Expressions.rewriteNot(expr), caseSensitive);
+  }
+
+  public long applyIndex(ParquetFileReader reader, int rowGroupIndex) {
+    RowRanges ranges = new EvalVisitor(reader.getRowGroups().get(rowGroupIndex), reader).eval();
+    ParquetRanges.setRanges(reader, rowGroupIndex, ranges);

Review Comment:
   @zhongyujiang, this is where we set the ranges for each row group. The `ParquetRanges` class uses reflection to do it.
   
   I'm all for getting a public API to be able to create `RowRanges` and pass them to the reader. I just don't think that we need to wait when we should be able to use reflection in the meantime.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhongyujiang commented on pull request #6935: Parquet: Add page filter using page indexes

Posted by "zhongyujiang (via GitHub)" <gi...@apache.org>.
zhongyujiang commented on PR #6935:
URL: https://github.com/apache/iceberg/pull/6935#issuecomment-1446361115

   > I'm all for adding what we need to Parquet. We can continue to use reflection until it is available.
   > 
   > I think the main thing is that we don't currently handle the row ranges after we've skipped reading the pages. So the next steps are to verify what's in this PR and then to update the read paths so that values are skipped for the skipped rows.
   
   Got it!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] zhongyujiang commented on pull request #6935: Parquet: Add page filter using page indexes

Posted by "zhongyujiang (via GitHub)" <gi...@apache.org>.
zhongyujiang commented on PR #6935:
URL: https://github.com/apache/iceberg/pull/6935#issuecomment-1445144640

   Thanks for your sharing!
   >  The filter is applied at the row group level, but it looks like that's not how RowRanges works in Parquet, so this probably needs to be refactored to run the filter, produce RowRanges, and set them on the ParquetFileReader just once.
   
   I think `RowRanges` is also at the row [group level](https://github.com/apache/parquet-mr/blob/c9cfe821448a2f99797fda7f46c70a16cc1250a9/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java#L33), Parquet-mr will [generate](https://github.com/apache/parquet-mr/blob/c9cfe821448a2f99797fda7f46c70a16cc1250a9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1142)) a `RowRanges` for each row-group when running the column index filter. So I think the `allRows` is correct because the row index stored in the page index is also its position in the row-group.
   
   I have a convern is that  currently we can't read filtered row-group even after we set `RowRanges` in ParquetFileReader.  The `readFilteredRowGroup` method provided by Parquet will [detect](https://github.com/apache/parquet-mr/blob/c9cfe821448a2f99797fda7f46c70a16cc1250a9/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1004) whether there is a filter pushed down,
   and only return the filtered row-group when there is a push-down filter.
   So I think we maybe can add a method in ParquetFileReader that allows users to specify RowRanges by themselves, after all Iceberg will calculate these RowRanges by itself:
   
   ```
   public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges rowRanges) throws IOException {
   ```
   @rdblue WDYT? In addition, I think we can also let Parquet expose some methods to make it easier for Iceberg to implement column index filter. We will have to wait for the next version to use, but Parquet-mr may have a release in a month, see this [comment](https://github.com/apache/parquet-mr/pull/982#issuecomment-1376750703) , we should be able to catch up with this release. If you agree, I can open a PR in the Parquet-mr repo. I think we could use these [changes](https://github.com/zhongyujiang/parquet-mr/commit/25ad270f733ea8797c2bb7af24b58cd13d76a748) in Parquet. Here is my [implementation](https://github.com/zhongyujiang/iceberg/commit/b8c13c500d4437cd3f718f29df489f9fe6119e2d#diff-1c6b5e906f6299257c0d58fef74ee99569bfa24a9f3a6a257a39744d07e2a63b) based on these Parquet APIs, if you are willing to review it :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org