You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/04/11 15:38:38 UTC
[incubator-iceberg] branch master updated: Add case insensitive
support to Parquet. (#141)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 2fdc6f7 Add case insensitive support to Parquet. (#141)
2fdc6f7 is described below
commit 2fdc6f72294ab6bd6ff781a0dd1726302bfd91cd
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Thu Apr 11 08:38:34 2019 -0700
Add case insensitive support to Parquet. (#141)
---
.../java/org/apache/iceberg/parquet/Parquet.java | 14 ++++++--
.../parquet/ParquetDictionaryRowGroupFilter.java | 6 +++-
.../org/apache/iceberg/parquet/ParquetFilters.java | 37 ++++------------------
.../parquet/ParquetMetricsRowGroupFilter.java | 6 +++-
.../org/apache/iceberg/parquet/ParquetReader.java | 13 +++++---
.../parquet/TestDictionaryRowGroupFilter.java | 6 ++++
.../iceberg/parquet/TestMetricsRowGroupFilter.java | 7 ++++
.../org/apache/iceberg/spark/source/Reader.java | 1 +
.../iceberg/spark/data/TestParquetAvroReader.java | 34 +++++++++++---------
.../iceberg/spark/data/TestParquetAvroWriter.java | 14 ++++----
10 files changed, 76 insertions(+), 62 deletions(-)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 1cb50d3..22c5f18 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -257,6 +257,7 @@ public class Parquet {
private ReadSupport<?> readSupport = null;
private Function<MessageType, ParquetValueReader<?>> readerFunc = null;
private boolean filterRecords = true;
+ private boolean caseSensitive = true;
private Map<String, String> properties = Maps.newHashMap();
private boolean callInit = false;
private boolean reuseContainers = false;
@@ -283,6 +284,15 @@ public class Parquet {
return this;
}
+ public ReadBuilder caseInsensitive() {
+ return caseSensitive(false);
+ }
+
+ public ReadBuilder caseSensitive(boolean caseSensitive) {
+ this.caseSensitive = caseSensitive;
+ return this;
+ }
+
public ReadBuilder filterRecords(boolean filterRecords) {
this.filterRecords = filterRecords;
return this;
@@ -339,7 +349,7 @@ public class Parquet {
ParquetReadOptions options = optionsBuilder.build();
return new org.apache.iceberg.parquet.ParquetReader<>(
- file, schema, options, readerFunc, filter, reuseContainers);
+ file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
}
ParquetReadBuilder<D> builder = new ParquetReadBuilder<>(ParquetIO.file(file));
@@ -374,7 +384,7 @@ public class Parquet {
builder.useStatsFilter()
.useDictionaryFilter()
.useRecordFilter(filterRecords)
- .withFilter(ParquetFilters.convert(fileSchema, filter));
+ .withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false)
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
index 57f728c..58fe902 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetDictionaryRowGroupFilter.java
@@ -66,9 +66,13 @@ public class ParquetDictionaryRowGroupFilter {
}
public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound) {
+ this(schema, unbound, true);
+ }
+
+ public ParquetDictionaryRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
this.schema = schema;
this.struct = schema.asStruct();
- this.expr = Binder.bind(struct, rewriteNot(unbound), true);
+ this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
}
/**
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
index 84bfe2a..b4a675e 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java
@@ -29,7 +29,6 @@ import org.apache.iceberg.expressions.ExpressionVisitors.ExpressionVisitor;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.expressions.UnboundPredicate;
-import org.apache.iceberg.types.Types;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterApi;
import org.apache.parquet.filter2.predicate.FilterPredicate;
@@ -40,19 +39,8 @@ import static org.apache.iceberg.expressions.ExpressionVisitors.visit;
class ParquetFilters {
- static FilterCompat.Filter convert(Schema schema, Expression expr) {
- FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema));
- // TODO: handle AlwaysFalse.INSTANCE
- if (pred != null && pred != AlwaysTrue.INSTANCE) {
- // FilterCompat will apply LogicalInverseRewriter
- return FilterCompat.get(pred);
- } else {
- return FilterCompat.NOOP;
- }
- }
-
- static FilterCompat.Filter convertColumnFilter(Schema schema, String column, Expression expr) {
- FilterPredicate pred = visit(expr, new ConvertColumnFilterToParquet(schema, column));
+ static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) {
+ FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema, caseSensitive));
// TODO: handle AlwaysFalse.INSTANCE
if (pred != null && pred != AlwaysTrue.INSTANCE) {
// FilterCompat will apply LogicalInverseRewriter
@@ -64,9 +52,11 @@ class ParquetFilters {
private static class ConvertFilterToParquet extends ExpressionVisitor<FilterPredicate> {
private final Schema schema;
+ private final boolean caseSensitive;
- private ConvertFilterToParquet(Schema schema) {
+ private ConvertFilterToParquet(Schema schema, boolean caseSensitive) {
this.schema = schema;
+ this.caseSensitive = caseSensitive;
}
@Override
@@ -160,7 +150,7 @@ class ParquetFilters {
}
protected Expression bind(UnboundPredicate<?> pred) {
- return pred.bind(schema.asStruct(), true);
+ return pred.bind(schema.asStruct(), caseSensitive);
}
@Override
@@ -178,21 +168,6 @@ class ParquetFilters {
}
}
- private static class ConvertColumnFilterToParquet extends ConvertFilterToParquet {
- private final Types.StructType partitionStruct;
-
- private ConvertColumnFilterToParquet(Schema schema, String column) {
- super(schema);
- this.partitionStruct = schema.findField(column).type().asNestedType().asStructType();
- }
-
- @Override
- protected Expression bind(UnboundPredicate<?> pred) {
- // instead of binding the predicate using the top-level schema, bind it to the partition data
- return pred.bind(partitionStruct, true);
- }
- }
-
private static
<C extends Comparable<C>, COL extends Operators.Column<C> & Operators.SupportsLtGt>
FilterPredicate pred(Operation op, COL col, C value) {
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
index 9b66ed8..4fe93dd 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetMetricsRowGroupFilter.java
@@ -56,9 +56,13 @@ public class ParquetMetricsRowGroupFilter {
}
public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound) {
+ this(schema, unbound, true);
+ }
+
+ public ParquetMetricsRowGroupFilter(Schema schema, Expression unbound, boolean caseSensitive) {
this.schema = schema;
this.struct = schema.asStruct();
- this.expr = Binder.bind(struct, rewriteNot(unbound), true);
+ this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
}
/**
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
index 2dd930b..653bb49 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
@@ -49,10 +49,11 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
private final Function<MessageType, ParquetValueReader<?>> readerFunc;
private final Expression filter;
private final boolean reuseContainers;
+ private final boolean caseSensitive;
public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options,
Function<MessageType, ParquetValueReader<?>> readerFunc,
- Expression filter, boolean reuseContainers) {
+ Expression filter, boolean reuseContainers, boolean caseSensitive) {
this.input = input;
this.expectedSchema = expectedSchema;
this.options = options;
@@ -60,6 +61,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
// replace alwaysTrue with null to avoid extra work evaluating a trivial filter
this.filter = filter == Expressions.alwaysTrue() ? null : filter;
this.reuseContainers = reuseContainers;
+ this.caseSensitive = caseSensitive;
}
private static class ReadConf<T> {
@@ -75,7 +77,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
@SuppressWarnings("unchecked")
ReadConf(InputFile file, ParquetReadOptions options, Schema expectedSchema, Expression filter,
- Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers) {
+ Function<MessageType, ParquetValueReader<?>> readerFunc, boolean reuseContainers,
+ boolean caseSensitive) {
this.file = file;
this.options = options;
this.reader = newReader(file, options);
@@ -95,8 +98,8 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
ParquetMetricsRowGroupFilter statsFilter = null;
ParquetDictionaryRowGroupFilter dictFilter = null;
if (filter != null) {
- statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter);
- dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter);
+ statsFilter = new ParquetMetricsRowGroupFilter(expectedSchema, filter, caseSensitive);
+ dictFilter = new ParquetDictionaryRowGroupFilter(expectedSchema, filter, caseSensitive);
}
long totalValues = 0L;
@@ -172,7 +175,7 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
private ReadConf<T> init() {
if (conf == null) {
ReadConf<T> conf = new ReadConf<>(
- input, options, expectedSchema, filter, readerFunc, reuseContainers);
+ input, options, expectedSchema, filter, readerFunc, reuseContainers, caseSensitive);
this.conf = conf.copy();
return conf;
}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
index 1523ac9..20c4553 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestDictionaryRowGroupFilter.java
@@ -470,4 +470,10 @@ public class TestDictionaryRowGroupFilter {
Assert.assertFalse("Should skip: contains only ''", shouldRead);
}
+ @Test
+ public void testCaseInsensitive() {
+ boolean shouldRead = new ParquetDictionaryRowGroupFilter(SCHEMA, notEqual("no_Nulls", ""), false)
+ .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA, DICTIONARY_STORE);
+ Assert.assertFalse("Should skip: contains only ''", shouldRead);
+ }
}
diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
index 0a34666..41a091c 100644
--- a/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
+++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestMetricsRowGroupFilter.java
@@ -460,4 +460,11 @@ public class TestMetricsRowGroupFilter {
.shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
Assert.assertTrue("Should read: id above upper bound", shouldRead);
}
+
+ @Test
+ public void testCaseInsensitive() {
+ boolean shouldRead = new ParquetMetricsRowGroupFilter(SCHEMA, equal("ID", 5), false)
+ .shouldRead(PARQUET_SCHEMA, ROW_GROUP_METADATA);
+ Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index d241370..a74d9cd 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -462,6 +462,7 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
.split(task.start(), task.length())
.createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema))
.filter(task.residual())
+ .caseSensitive(caseSensitive)
.build();
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
index d2648e2..de98a4b 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroReader.java
@@ -25,15 +25,12 @@ import java.util.Iterator;
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetAvroValueReaders;
-import org.apache.iceberg.parquet.ParquetReader;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.types.Types;
-import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
import org.junit.Ignore;
@@ -90,6 +87,7 @@ public class TestParquetAvroReader {
);
File testFile = writeTestData(structSchema, 5_000_000, 1059);
+ // RandomData uses the root record name "test", which must match for records to be equal
MessageType readSchema = ParquetSchemaUtil.convert(structSchema, "test");
long sum = 0;
@@ -101,10 +99,11 @@ public class TestParquetAvroReader {
// clean up as much memory as possible to avoid a large GC during the timed run
System.gc();
- try (ParquetReader<Record> reader = new ParquetReader<>(
- Files.localInput(testFile), structSchema, ParquetReadOptions.builder().build(),
- fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema),
- Expressions.alwaysTrue(), true)) {
+ try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+ .project(structSchema)
+ .createReaderFunc(
+ fileSchema -> ParquetAvroValueReaders.buildReader(structSchema, readSchema))
+ .build()) {
long start = System.currentTimeMillis();
long val = 0;
long count = 0;
@@ -136,6 +135,7 @@ public class TestParquetAvroReader {
@Ignore
public void testWithOldReadPath() throws IOException {
File testFile = writeTestData(COMPLEX_SCHEMA, 500_000, 1985);
+ // RandomData uses the root record name "test", which must match for records to be equal
MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
for (int i = 0; i < 5; i += 1) {
@@ -162,10 +162,11 @@ public class TestParquetAvroReader {
// clean up as much memory as possible to avoid a large GC during the timed run
System.gc();
- try (ParquetReader<Record> reader = new ParquetReader<>(
- Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(),
- fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema),
- Expressions.alwaysTrue(), true)) {
+ try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+ .project(COMPLEX_SCHEMA)
+ .createReaderFunc(
+ fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+ .build()) {
long start = System.currentTimeMillis();
long val = 0;
long count = 0;
@@ -195,13 +196,16 @@ public class TestParquetAvroReader {
writer.addAll(records);
}
+ // RandomData uses the root record name "test", which must match for records to be equal
MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
// verify that the new read path is correct
- try (ParquetReader<Record> reader = new ParquetReader<>(
- Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(),
- fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema),
- Expressions.alwaysTrue(), true)) {
+ try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+ .project(COMPLEX_SCHEMA)
+ .createReaderFunc(
+ fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+ .reuseContainers()
+ .build()) {
int i = 0;
Iterator<Record> iter = records.iterator();
for (Record actual : reader) {
diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
index 5562a1d..411ed32 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestParquetAvroWriter.java
@@ -25,15 +25,13 @@ import java.util.Iterator;
import org.apache.avro.generic.GenericData.Record;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetAvroValueReaders;
import org.apache.iceberg.parquet.ParquetAvroWriter;
-import org.apache.iceberg.parquet.ParquetReader;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.types.Types;
-import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.schema.MessageType;
import org.junit.Assert;
import org.junit.Rule;
@@ -87,13 +85,15 @@ public class TestParquetAvroWriter {
writer.addAll(records);
}
+ // RandomData uses the root record name "test", which must match for records to be equal
MessageType readSchema = ParquetSchemaUtil.convert(COMPLEX_SCHEMA, "test");
// verify that the new read path is correct
- try (ParquetReader<Record> reader = new ParquetReader<>(
- Files.localInput(testFile), COMPLEX_SCHEMA, ParquetReadOptions.builder().build(),
- fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema),
- Expressions.alwaysTrue(), false)) {
+ try (CloseableIterable<Record> reader = Parquet.read(Files.localInput(testFile))
+ .project(COMPLEX_SCHEMA)
+ .createReaderFunc(
+ fileSchema -> ParquetAvroValueReaders.buildReader(COMPLEX_SCHEMA, readSchema))
+ .build()) {
int i = 0;
Iterator<Record> iter = records.iterator();
for (Record actual : reader) {