You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ga...@apache.org on 2020/01/16 13:31:38 UTC
[parquet-mr] branch master updated: PARQUET-1765: Invalid
filteredRowCount in InternalParquetRecordReader (#747)
This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 8c1bc9b PARQUET-1765: Invalid filteredRowCount in InternalParquetRecordReader (#747)
8c1bc9b is described below
commit 8c1bc9bcdeeac8178fecf61d18dc56913907fd46
Author: Gabor Szadovszky <ga...@apache.org>
AuthorDate: Thu Jan 16 14:31:27 2020 +0100
PARQUET-1765: Invalid filteredRowCount in InternalParquetRecordReader (#747)
---
.../hadoop/InternalParquetRecordReader.java | 8 +++-
.../filter2/recordlevel/PhoneBookWriter.java | 8 ++++
.../parquet/hadoop/TestColumnIndexFiltering.java | 50 ++++++++++++++++++++++
3 files changed, 64 insertions(+), 2 deletions(-)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index e57f3cb..80debd7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -180,12 +180,14 @@ class InternalParquetRecordReader<T> {
this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
this.requestedSchema = readContext.getRequestedSchema();
this.columnCount = requestedSchema.getPaths().size();
+ // Setting the projection schema before running any filtering (e.g. getting filtered record count)
+ // because projection impacts filtering
+ reader.setRequestedSchema(requestedSchema);
this.recordConverter = readSupport.prepareForRead(conf, fileMetadata, fileSchema, readContext);
this.strictTypeChecking = options.isEnabled(STRICT_TYPE_CHECKING, true);
this.total = reader.getFilteredRecordCount();
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(options, total);
this.filterRecords = options.useRecordFilter();
- reader.setRequestedSchema(requestedSchema);
LOG.info("RecordReader initialized will read a total of {} records.", total);
}
@@ -201,13 +203,15 @@ class InternalParquetRecordReader<T> {
this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy());
this.requestedSchema = readContext.getRequestedSchema();
this.columnCount = requestedSchema.getPaths().size();
+ // Setting the projection schema before running any filtering (e.g. getting filtered record count)
+ // because projection impacts filtering
+ reader.setRequestedSchema(requestedSchema);
this.recordConverter = readSupport.prepareForRead(
configuration, fileMetadata, fileSchema, readContext);
this.strictTypeChecking = configuration.getBoolean(STRICT_TYPE_CHECKING, true);
this.total = reader.getFilteredRecordCount();
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
this.filterRecords = configuration.getBoolean(RECORD_FILTERING_ENABLED, true);
- reader.setRequestedSchema(requestedSchema);
LOG.info("RecordReader initialized will read a total of {} records.", total);
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index 18ddca0..6355f35 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -199,6 +199,10 @@ public class PhoneBookWriter {
public String toString() {
return "User [id=" + id + ", name=" + name + ", phoneNumbers=" + phoneNumbers + ", location=" + location + "]";
}
+
+ public User cloneWithName(String name) {
+ return new User(id, name, phoneNumbers, location);
+ }
}
public static SimpleGroup groupFromUser(User user) {
@@ -257,6 +261,10 @@ public class PhoneBookWriter {
}
private static boolean isNull(Group group, String field) {
+ // Use null value if the field is not in the group schema
+ if (!group.getType().containsField(field)) {
+ return true;
+ }
int repetition = group.getFieldRepetitionCount(field);
if (repetition == 0) {
return true;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
index ccb6a03..c18212e 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -19,6 +19,7 @@
package org.apache.parquet.hadoop;
import static java.util.Collections.emptyList;
+import static java.util.stream.Collectors.toList;
import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
@@ -33,6 +34,12 @@ import static org.apache.parquet.filter2.predicate.FilterApi.or;
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Types.optional;
+import static org.apache.parquet.schema.Types.required;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -64,9 +71,12 @@ import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
+import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -87,6 +97,19 @@ public class TestColumnIndexFiltering {
private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
private static final Path FILE_V1 = createTempFile();
private static final Path FILE_V2 = createTempFile();
+ private static final MessageType SCHEMA_WITHOUT_NAME = Types.buildMessage()
+ .required(INT64).named("id")
+ .optionalGroup()
+ .addField(optional(DOUBLE).named("lon"))
+ .addField(optional(DOUBLE).named("lat"))
+ .named("location")
+ .optionalGroup()
+ .repeatedGroup()
+ .addField(required(INT64).named("number"))
+ .addField(optional(BINARY).as(stringType()).named("kind"))
+ .named("phone")
+ .named("phoneNumbers")
+ .named("user_without_name");
@Parameters
public static Collection<Object[]> params() {
@@ -199,6 +222,16 @@ public class TestColumnIndexFiltering {
.useColumnIndexFilter(useColumnIndexFilter));
}
+ private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, boolean useColumnIndexFilter) throws IOException {
+ return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
+ .withFilter(filter)
+ .useDictionaryFilter(useOtherFiltering)
+ .useStatsFilter(useOtherFiltering)
+ .useRecordFilter(useOtherFiltering)
+ .useColumnIndexFilter(useColumnIndexFilter)
+ .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
+ }
+
// Assumes that both lists are in the same order
private static void assertContains(Stream<User> expected, List<User> actual) {
Iterator<User> expIt = expected.iterator();
@@ -441,4 +474,21 @@ public class TestColumnIndexFiltering {
or(eq(longColumn("id"), 1234l),
userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1))));
}
+
+ @Test
+ public void testFilteringWithProjection() throws IOException {
+ // All rows shall be retrieved because all values in column 'name' shall be handled as null values
+ assertEquals(
+ DATA.stream().map(user -> user.cloneWithName(null)).collect(toList()),
+ readUsersWithProjection(FilterCompat.get(eq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, true, true));
+
+ // Column index filter shall drop all pages because all values in column 'name' shall be handled as null values
+ assertEquals(
+ emptyList(),
+ readUsersWithProjection(FilterCompat.get(notEq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, false, true));
+ assertEquals(
+ emptyList(),
+ readUsersWithProjection(FilterCompat.get(userDefined(binaryColumn("name"), NameStartsWithVowel.class)),
+ SCHEMA_WITHOUT_NAME, false, true));
+ }
}