You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/03/24 18:41:26 UTC

[iceberg] 11/18: Parquet: Fix row group filters with promoted types (#2232)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch 0.11.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 2feeab5a47c91386a2ed26b744f1936157358521
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Fri Feb 12 20:38:57 2021 -0800

    Parquet: Fix row group filters with promoted types (#2232)
    
    This fixes Parquet row group filters when types have been promoted from int to long or from float to double.
    
    The filters are passed the file schema after ids are added, which is used to convert dictionary values or lower/upper bounds. That conversion currently uses the file's types to deserialize, but the filter expression is bound to the table types. If the types differ, then comparison in the evaluator fails.
    
    This updates the conversion to first deserialize the Parquet value and then promote it if the table's type has changed. Only int to long and float to double are needed because those are the only type promotions that use a different representation.
---
 .../iceberg/data/TestMetricsRowGroupFilter.java      |  9 +++++++++
 .../apache/iceberg/parquet/ParquetConversions.java   | 15 +++++++++++++++
 .../parquet/ParquetDictionaryRowGroupFilter.java     |  6 +++++-
 .../parquet/ParquetMetricsRowGroupFilter.java        | 20 ++++++++++++++++++--
 .../parquet/TestDictionaryRowGroupFilter.java        |  8 ++++++++
 5 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
index b82d6eb..f03544d 100644
--- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
+++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java
@@ -812,6 +812,15 @@ public class TestMetricsRowGroupFilter {
     Assert.assertTrue("Should read if IN is not evaluated", shouldRead);
   }
 
+  @Test
+  public void testParquetTypePromotion() {
+    Assume.assumeTrue("Only valid for Parquet", format == FileFormat.PARQUET);
+    Schema promotedSchema = new Schema(required(1, "id", Types.LongType.get()));
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(promotedSchema, equal("id", INT_MIN_VALUE + 1), true)
+        .shouldRead(parquetSchema, rowGroupMetadata);
+    Assert.assertTrue("Should succeed with promoted schema", shouldRead);
+  }
+
   private boolean shouldRead(Expression expression) {
     return shouldRead(expression, true);
   }
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
index 431c636..78b3a31 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetConversions.java
@@ -68,6 +68,21 @@ class ParquetConversions {
     }
   }
 
+  static Function<Object, Object> converterFromParquet(PrimitiveType parquetType, Type icebergType) {
+    Function<Object, Object> fromParquet = converterFromParquet(parquetType);
+    if (icebergType != null) {
+      if (icebergType.typeId() == Type.TypeID.LONG &&
+          parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) {
+        return value -> ((Integer) fromParquet.apply(value)).longValue();
+      } else if (icebergType.typeId() == Type.TypeID.DOUBLE &&
+          parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FLOAT) {
+        return value -> ((Float) fromParquet.apply(value)).doubleValue();
+      }
+    }
+
+    return fromParquet;
+  }
+
   static Function<Object, Object> converterFromParquet(PrimitiveType type) {
     if (type.getOriginalType() != null) {
       switch (type.getOriginalType()) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index d72cf49..37c7d6e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.util.NaNUtil;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -49,6 +50,7 @@ import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
 
 public class ParquetDictionaryRowGroupFilter {
+  private final Schema schema;
   private final Expression expr;
 
   public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
@@ -56,6 +58,7 @@ public class ParquetDictionaryRowGroupFilter {
   }
 
   public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
+    this.schema = schema;
     StructType struct = schema.asStruct();
     this.expr = Binder.bind(struct, Expressions.rewriteNot(unbound), caseSensitive);
   }
@@ -96,8 +99,9 @@ public class ParquetDictionaryRowGroupFilter {
         PrimitiveType colType = fileSchema.getType(desc.getPath()).asPrimitiveType();
         if (colType.getId() != null) {
           int id = colType.getId().intValue();
+          Type icebergType = schema.findType(id);
           cols.put(id, desc);
-          conversions.put(id, ParquetConversions.converterFromParquet(colType));
+          conversions.put(id, ParquetConversions.converterFromParquet(colType, icebergType));
         }
       }
 
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index ee026cc..f83d701 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -77,7 +77,7 @@ public class ParquetMetricsRowGroupFilter {
   private static final boolean ROWS_CANNOT_MATCH = false;
 
   private class MetricsEvalVisitor extends BoundExpressionVisitor<Boolean> {
-    private Map<Integer, Statistics> stats = null;
+    private Map<Integer, Statistics<?>> stats = null;
     private Map<Integer, Long> valueCounts = null;
     private Map<Integer, Function<Object, Object>> conversions = null;
 
@@ -93,9 +93,10 @@ public class ParquetMetricsRowGroupFilter {
         PrimitiveType colType = fileSchema.getType(col.getPath().toArray()).asPrimitiveType();
         if (colType.getId() != null) {
           int id = colType.getId().intValue();
+          Type icebergType = schema.findType(id);
           stats.put(id, col.getStatistics());
           valueCounts.put(id, col.getValueCount());
-          conversions.put(id, ParquetConversions.converterFromParquet(colType));
+          conversions.put(id, ParquetConversions.converterFromParquet(colType, icebergType));
         }
       }
 
@@ -502,4 +503,19 @@ public class ParquetMetricsRowGroupFilter {
     return statistics.getNumNulls() < valueCount &&
         (statistics.getMaxBytes() == null || statistics.getMinBytes() == null);
   }
+
+  private static Function<Object, Object> converterFor(PrimitiveType parquetType, Type icebergType) {
+    Function<Object, Object> fromParquet = ParquetConversions.converterFromParquet(parquetType);
+    if (icebergType != null) {
+      if (icebergType.typeId() == Type.TypeID.LONG &&
+          parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32) {
+        return value -> ((Integer) fromParquet.apply(value)).longValue();
+      } else if (icebergType.typeId() == Type.TypeID.DOUBLE &&
+          parquetType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FLOAT) {
+        return value -> ((Float) fromParquet.apply(value)).doubleValue();
+      }
+    }
+
+    return fromParquet;
+  }
 }
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
index a5e7a35..c02bf27 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
@@ -884,4 +884,12 @@ public class TestDictionaryRowGroupFilter {
         .shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
     Assert.assertFalse("Should not read: notIn on no nulls column (empty string is within the set)", shouldRead);
   }
+
+  @Test
+  public void testTypePromotion() {
+    Schema promotedSchema = new Schema(required(1, "id", LongType.get()));
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(promotedSchema, equal("id", INT_MIN_VALUE + 1), true)
+        .shouldRead(parquetSchema, rowGroupMetadata, dictionaryStore);
+    Assert.assertTrue("Should succeed with promoted schema", shouldRead);
+  }
 }