You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/11 15:38:38 UTC

[incubator-iceberg] branch master updated: Add case insensitive support to Parquet. (#141)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fdc6f7  Add case insensitive support to Parquet. (#141)
2fdc6f7 is described below

commit 2fdc6f72294ab6bd6ff781a0dd1726302bfd91cd
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Apr 11 08:38:34 2019 -0700

    Add case insensitive support to Parquet. (#141)
---
 .../java/org/apache/iceberg/parquet/Parquet.java   | 14 ++++++--
 .../parquet/ParquetDictionaryRowGroupFilter.java   |  6 +++-
 .../org/apache/iceberg/parquet/ParquetFilters.java | 37 ++++------------------
 .../parquet/ParquetMetricsRowGroupFilter.java      |  6 +++-
 .../org/apache/iceberg/parquet/ParquetReader.java  | 13 +++++---
 .../parquet/TestDictionaryRowGroupFilter.java      |  6 ++++
 .../iceberg/parquet/TestMetricsRowGroupFilter.java |  7 ++++
 .../org/apache/iceberg/spark/source/Reader.java    |  1 +
 .../iceberg/spark/data/TestParquetAvroReader.java  | 34 +++++++++++---------
 .../iceberg/spark/data/TestParquetAvroWriter.java  | 14 ++++----
 10 files changed, 76 insertions(+), 62 deletions(-)

diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 1cb50d3..22c5f18 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -257,6 +257,7 @@ public class Parquet {
     private ReadSupport<?> readSupport = null;
     private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
     private boolean filterRecords = true;
+    private boolean caseSensitive = true;
     private Map<String, String> properties = Maps.newHashMap();
     private boolean callInit = false;
     private boolean reuseContainers = false;
@@ -283,6 +284,15 @@ public class Parquet {
       return this;
     }
 
+    public ReadBuilder caseInsensitive() {
+      return caseSensitive(false);
+    }
+
+    public ReadBuilder caseSensitive(boolean caseSensitive) {
+      this.caseSensitive = caseSensitive;
+      return this;
+    }
+
     public ReadBuilder filterRecords(boolean filterRecords) {
       this.filterRecords = filterRecords;
       return this;
@@ -339,7 +349,7 @@ public class Parquet {
         ParquetReadOptions options = optionsBuilder.build();
 
         return new org.apache.iceberg.parquet.ParquetReader<>(
-            file, schema, options, readerFunc, filter, reuseContainers);
+            file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
       }
 
       ParquetReadBuilder<D> builder = new ParquetReadBuilder<>(ParquetIO.file(file));
@@ -374,7 +384,7 @@ public class Parquet {
         builder.useStatsFilter()
             .useDictionaryFilter()
             .useRecordFilter(filterRecords)
-            .withFilter(ParquetFilters.convert(fileSchema, filter));
+            .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
       } else {
         // turn off filtering
         builder.useStatsFilter(false)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index 57f728c..58fe902 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -66,9 +66,13 @@ public class ParquetDictionaryRowGroupFilter {
   }
 
   public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
+    this(schema, unbound, true);
+  }
+
+  public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
     this.schema = schema;
     this.struct = schema.asStruct();
-    this.expr = Binder.bind(struct, rewriteNot(unbound), true);
+    this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
   }
 
   /**
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
index 84bfe2a..b4a675e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
@@ -29,7 +29,6 @@ import org.apache.iceberg.expressions.ExpressionVisitors.ExpressionVisitor;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.expressions.UnboundPredicate;
-import org.apache.iceberg.types.Types;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.predicate.FilterApi;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
@@ -40,19 +39,8 @@ import static org.apache.iceberg.expressions.ExpressionVisitors.visit;
 
 class ParquetFilters {
 
-  static FilterCompat.Filter convert(Schema schema, Expression expr) {
-    FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema));
-    // TODO: handle AlwaysFalse.INSTANCE
-    if (pred != null && pred != AlwaysTrue.INSTANCE) {
-      // FilterCompat will apply LogicalInverseRewriter
-      return FilterCompat.get(pred);
-    } else {
-      return FilterCompat.NOOP;
-    }
-  }
-
-  static FilterCompat.Filter convertColumnFilter(Schema schema, String column, Expression expr) {
-    FilterPredicate pred = visit(expr, new ConvertColumnFilterToParquet(schema, column));
+  static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) {
+    FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema, caseSensitive));
     // TODO: handle AlwaysFalse.INSTANCE
     if (pred != null && pred != AlwaysTrue.INSTANCE) {
       // FilterCompat will apply LogicalInverseRewriter
@@ -64,9 +52,11 @@ class ParquetFilters {
 
   private static class ConvertFilterToParquet extends ExpressionVisitor<FilterPredicate> {
     private final Schema schema;
+    private final boolean caseSensitive;
 
-    private ConvertFilterToParquet(Schema schema) {
+    private ConvertFilterToParquet(Schema schema, boolean caseSensitive) {
       this.schema = schema;
+      this.caseSensitive = caseSensitive;
     }
 
     @Override
@@ -160,7 +150,7 @@ class ParquetFilters {
     }
 
     protected Expression bind(UnboundPredicate<?> pred) {
-      return pred.bind(schema.asStruct(), true);
+      return pred.bind(schema.asStruct(), caseSensitive);
     }
 
     @Override
@@ -178,21 +168,6 @@ class ParquetFilters {
     }
   }
 
-  private static class ConvertColumnFilterToParquet extends ConvertFilterToParquet {
-    private final Types.StructType partitionStruct;
-
-    private ConvertColumnFilterToParquet(Schema schema, String column) {
-      super(schema);
-      this.partitionStruct = schema.findField(column).type().asNestedType().asStructType();
-    }
-
-    @Override
-    protected Expression bind(UnboundPredicate<?> pred) {
-      // instead of binding the predicate using the top-level schema, bind it to the partition data
-      return pred.bind(partitionStruct, true);
-    }
-  }
-
   private static
   <C extends Comparable<C>, COL extends Operators.Column<C> & Operators.SupportsLtGt>
   FilterPredicate pred(Operation op, COL col, C value) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index 9b66ed8..4fe93dd 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -56,9 +56,13 @@ public class ParquetMetricsRowGroupFilter {
   }
 
   public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
+    this(schema, unbound, true);
+  }
+
+  public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
     this.schema = schema;
     this.struct = schema.asStruct();
-    this.expr = Binder.bind(struct, rewriteNot(unbound), true);
+    this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
   }
 
   /**
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 2dd930b..653bb49 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -49,10 +49,11 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
   private final Function<MessageType, ParquetValueReader<?>> readerFunc;
   private final Expression filter;
   private final boolean reuseContainers;
+  private final boolean caseSensitive;
 
   public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options,
                        Function<MessageType, ParquetValueReader<?>> readerFunc,
-                       Expression filter, boolean reuseContainers) {
+                       Expression filter, boolean reuseContainers, boolean caseSensitive) {
     this.input = input;
     this.expectedSchema = expectedSchema;
     this.options = options;
@@ -60,6 +61,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
     // replace alwaysTrue with null to avoid extra work evaluating a trivial filter
     this.filter = filter == Expressions.alwaysTrue() ? null : filter;
     this.reuseContainers = reuseContainers;
+    this.caseSensitive = caseSensitive;
   }
 
   private static class ReadConf<T> {
@@ -75,7 +77,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
 
     @SuppressWarnings("unchecked")
     ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
-             Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers) {
+             Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
+             boolean caseSensitive) {
       this.file = file;
       this.options = options;
       this.reader = newReader(file, options);
@@ -95,8 +98,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
       ParquetMetricsRowGroupFilter statsFilter = null;
       ParquetDictionaryRowGroupFilter dictFilter = null;
       if (filter != null) {
-        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter);
-        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter);
+        statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+        dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
       }
 
       long totalValues = 0L;
@@ -172,7 +175,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
   private ReadConf<T> init() {
     if (conf == null) {
       ReadConf<T> conf = new ReadConf<>(
-          input, options, expectedSchema, filter, readerFunc, reuseContainers);
+          input, options, expectedSchema, filter, readerFunc, reuseContainers, caseSensitive);
       this.conf = conf.copy();
       return conf;
     }
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
index 1523ac9..20c4553 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
@@ -470,4 +470,10 @@ public class TestDictionaryRowGroupFilter {
     Assert.assertFalse("Should skip: contains only ''", shouldRead);
   }
 
+  @Test
+  public void testCaseInsensitive() {
+    boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, notEqual("no_Nulls", ""), false)
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+    Assert.assertFalse("Should skip: contains only ''", shouldRead);
+  }
 }
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
index 0a34666..41a091c 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
@@ -460,4 +460,11 @@ public class TestMetricsRowGroupFilter {
         .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
     Assert.assertTrue("Should read: id above upper bound", shouldRead);
   }
+
+  @Test
+  public void testCaseInsensitive() {
+    boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, equal("ID", 5), false)
+        .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+    Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index d241370..a74d9cd 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -462,6 +462,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
           .split(task.start(), task.length())
           .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
           .filter(task.residual())
+          .caseSensitive(caseSensitive)
           .build();
     }
   }
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
index d2648e2..de98a4b 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
@@ -25,15 +25,12 @@ import java.util.Iterator;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.ParquetAvroValueReaders;
-import org.apache.iceberg.parquet.ParquetReader;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.types.Types;
-import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.schema.MessageType;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -90,6 +87,7 @@ public class TestParquetAvroReader {
     );
 
     File testFile = writeTestData(structSchema, 5_000_000, 1059);
+    // RandomData uses the root record name "test", which must match for records to be equal
     MessageType readSchema = ParquetSchemaUtil.convert(structSchema, "test");
 
     long sum = 0;
@@ -101,10 +99,11 @@ public class TestParquetAvroReader {
       // clean up as much memory as possible to avoid a large GC during the timed run
       System.gc();
 
-      try (ParquetReader<Record> reader = new ParquetReader<>(
-          Files.localInput(testFile), structSchema, ParquetReadOptions.builder().build(),
-          fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema),
-          Expressions.alwaysTrue(), true)) {
+      try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+          .project(structSchema)
+          .createReaderFunc(
+              fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema))
+           .build()) {
         long start = System.currentTimeMillis();
         long val = 0;
         long count = 0;
@@ -136,6 +135,7 @@ public class TestParquetAvroReader {
   @Ignore
   public void testWithOldReadPath() throws IOException {
     File testFile = writeTestData(COMPLEX_SCHEMA, 500_000, 1985);
+    // RandomData uses the root record name "test", which must match for records to be equal
     MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
 
     for (int i = 0; i < 5; i += 1) {
@@ -162,10 +162,11 @@ public class TestParquetAvroReader {
       // clean up as much memory as possible to avoid a large GC during the timed run
       System.gc();
 
-      try (ParquetReader<Record> reader = new ParquetReader<>(
-          Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(),
-          fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema),
-          Expressions.alwaysTrue(), true)) {
+      try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+           .project(COMPLEX_SCHEMA)
+           .createReaderFunc(
+               fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+           .build()) {
         long start = System.currentTimeMillis();
         long val = 0;
         long count = 0;
@@ -195,13 +196,16 @@ public class TestParquetAvroReader {
       writer.addAll(records);
     }
 
+    // RandomData uses the root record name "test", which must match for records to be equal
     MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
 
     // verify that the new read path is correct
-    try (ParquetReader<Record> reader = new ParquetReader<>(
-        Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(),
-        fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema),
-        Expressions.alwaysTrue(), true)) {
+    try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+        .project(COMPLEX_SCHEMA)
+        .createReaderFunc(
+            fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+        .reuseContainers()
+        .build()) {
       int i = 0;
       Iterator<Record> iter = records.iterator();
       for (Record actual : reader) {
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
index 5562a1d..411ed32 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
@@ -25,15 +25,13 @@ import java.util.Iterator;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.parquet.ParquetAvroValueReaders;
 import org.apache.iceberg.parquet.ParquetAvroWriter;
-import org.apache.iceberg.parquet.ParquetReader;
 import org.apache.iceberg.parquet.ParquetSchemaUtil;
 import org.apache.iceberg.types.Types;
-import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.schema.MessageType;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -87,13 +85,15 @@ public class TestParquetAvroWriter {
       writer.addAll(records);
     }
 
+    // RandomData uses the root record name "test", which must match for records to be equal
     MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
 
     // verify that the new read path is correct
-    try (ParquetReader<Record> reader = new ParquetReader<>(
-        Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(),
-        fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema),
-        Expressions.alwaysTrue(), false)) {
+    try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+        .project(COMPLEX_SCHEMA)
+        .createReaderFunc(
+            fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+        .build()) {
       int i = 0;
       Iterator<Record> iter = records.iterator();
       for (Record actual : reader) {