You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/12/20 22:36:03 UTC
parquet-mr git commit: PARQUET-801: Allow UserDefinedPredicates in
DictionaryFilter
Repository: parquet-mr
Updated Branches:
refs/heads/master 71cff7c59 -> 89e0607cf
PARQUET-801: Allow UserDefinedPredicates in DictionaryFilter
Author: Patrick Woody <pw...@palantir.com>
Author: Patrick Woody <pa...@gmail.com>
Closes #394 from pwoody/pw/dictionaryUdp and squashes the following commits:
d8499a0 [Patrick Woody] short circuiting and style changes
4cb9f0c [Patrick Woody] more missing imports
1ec0d39 [Patrick Woody] fix missing import
3ee4489 [Patrick Woody] PARQUET-801: Allow UserDefinedPredicates in DictionaryFilter
Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/89e0607c
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/89e0607c
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/89e0607c
Branch: refs/heads/master
Commit: 89e0607cf6470dda1a6a47b46abf37468df4e50f
Parents: 71cff7c
Author: Patrick Woody <pw...@palantir.com>
Authored: Tue Dec 20 14:35:57 2016 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Tue Dec 20 14:35:57 2016 -0800
----------------------------------------------------------------------
.../dictionarylevel/DictionaryFilter.java | 56 +++++++++---
.../dictionarylevel/DictionaryFilterTest.java | 94 ++++++++++++++++++++
2 files changed, 140 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/89e0607c/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
index 91f3007..19604ec 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
@@ -212,8 +212,8 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
return BLOCK_MIGHT_MATCH;
}
- for(T entry : dictSet) {
- if(value.compareTo(entry) > 0) {
+ for (T entry : dictSet) {
+ if (value.compareTo(entry) > 0) {
return BLOCK_MIGHT_MATCH;
}
}
@@ -253,8 +253,8 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
return BLOCK_MIGHT_MATCH;
}
- for(T entry : dictSet) {
- if(value.compareTo(entry) >= 0) {
+ for (T entry : dictSet) {
+ if (value.compareTo(entry) >= 0) {
return BLOCK_MIGHT_MATCH;
}
}
@@ -292,8 +292,8 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
return BLOCK_MIGHT_MATCH;
}
- for(T entry : dictSet) {
- if(value.compareTo(entry) < 0) {
+ for (T entry : dictSet) {
+ if (value.compareTo(entry) < 0) {
return BLOCK_MIGHT_MATCH;
}
}
@@ -333,8 +333,8 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
return BLOCK_MIGHT_MATCH;
}
- for(T entry : dictSet) {
- if(value.compareTo(entry) <= 0) {
+ for (T entry : dictSet) {
+ if (value.compareTo(entry) <= 0) {
return BLOCK_MIGHT_MATCH;
}
}
@@ -363,14 +363,50 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
"This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
}
+ private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud, boolean inverted) {
+ Column<T> filterColumn = ud.getColumn();
+ ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+ U udp = ud.getUserDefinedPredicate();
+
+ // The column is missing, thus all null. Check if the predicate keeps null.
+ if (meta == null) {
+ if (inverted) {
+ return udp.keep(null);
+ } else {
+ return !udp.keep(null);
+ }
+ }
+
+ if (hasNonDictionaryPages(meta)) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ try {
+ Set<T> dictSet = expandDictionary(meta);
+ if (dictSet == null) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ for (T entry : dictSet) {
+ boolean keep = udp.keep(entry);
+ if ((keep && !inverted) || (!keep && inverted)) return BLOCK_MIGHT_MATCH;
+ }
+ return BLOCK_CANNOT_MATCH;
+ } catch (IOException e) {
+ LOG.warn("Failed to process dictionary for filter evaluation.", e);
+ }
+
+ return BLOCK_MIGHT_MATCH;
+ }
+
@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> udp) {
- throw new UnsupportedOperationException("UDP not supported with dictionary evaluation.");
+ return visit(udp, false);
}
@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> udp) {
- throw new UnsupportedOperationException("UDP not supported with dictionary evaluation.");
+ return visit(udp.getUserDefined(), true);
}
@SuppressWarnings("deprecation")
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/89e0607c/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
index eca6332..3883d87 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -19,6 +19,9 @@
package org.apache.parquet.filter2.dictionarylevel;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -27,11 +30,14 @@ import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
import org.apache.parquet.filter2.predicate.Operators.IntColumn;
import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
@@ -47,6 +53,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -331,6 +338,43 @@ public class DictionaryFilterTest {
canDrop(or(x, y), ccmd, dictionaries));
}
+
+ @Test
+ public void testUdp() throws Exception {
+ InInt32UDP dropabble = new InInt32UDP(ImmutableSet.of(42));
+ InInt32UDP undroppable = new InInt32UDP(ImmutableSet.of(205));
+
+ assertTrue("Should drop block for non-matching UDP",
+ canDrop(userDefined(intColumn("int32_field"), dropabble), ccmd, dictionaries));
+
+ assertFalse("Should not drop block for matching UDP",
+ canDrop(userDefined(intColumn("int32_field"), undroppable), ccmd, dictionaries));
+ }
+
+ @Test
+ public void testInverseUdp() throws Exception {
+ InInt32UDP droppable = new InInt32UDP(ImmutableSet.of(42));
+ InInt32UDP undroppable = new InInt32UDP(ImmutableSet.of(205));
+ Set<Integer> allValues = ImmutableSet.copyOf(Arrays.asList(ArrayUtils.toObject(intValues)));
+ InInt32UDP completeMatch = new InInt32UDP(allValues);
+
+ FilterPredicate inverse =
+ LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), droppable)));
+ FilterPredicate inverse1 =
+ LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), undroppable)));
+ FilterPredicate inverse2 =
+ LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), completeMatch)));
+
+ assertFalse("Should not drop block for inverse of non-matching UDP",
+ canDrop(inverse, ccmd, dictionaries));
+
+ assertFalse("Should not drop block for inverse of UDP with some matches",
+ canDrop(inverse1, ccmd, dictionaries));
+
+ assertTrue("Should drop block for inverse of UDP with all matches",
+ canDrop(inverse2, ccmd, dictionaries));
+ }
+
@Test
public void testColumnWithoutDictionary() throws Exception {
IntColumn plain = intColumn("plain_int32_field");
@@ -437,6 +481,56 @@ public class DictionaryFilterTest {
canDrop(gtEq(b, Binary.fromString("any")), ccmd, dictionaries));
}
+ @Test
+ public void testUdpMissingColumn() throws Exception {
+ InInt32UDP nullRejecting = new InInt32UDP(ImmutableSet.of(42));
+ InInt32UDP nullAccepting = new InInt32UDP(Sets.newHashSet((Integer) null));
+ IntColumn fake = intColumn("missing_column");
+
+ assertTrue("Should drop block for null rejecting udp",
+ canDrop(userDefined(fake, nullRejecting), ccmd, dictionaries));
+ assertFalse("Should not drop block for null accepting udp",
+ canDrop(userDefined(fake, nullAccepting), ccmd, dictionaries));
+ }
+
+
+ @Test
+ public void testInverseUdpMissingColumn() throws Exception {
+ InInt32UDP nullRejecting = new InInt32UDP(ImmutableSet.of(42));
+ InInt32UDP nullAccepting = new InInt32UDP(Sets.newHashSet((Integer) null));
+ IntColumn fake = intColumn("missing_column");
+
+ assertTrue("Should drop block for null accepting udp",
+ canDrop(LogicalInverseRewriter.rewrite(not(userDefined(fake, nullAccepting))), ccmd, dictionaries));
+ assertFalse("Should not drop block for null rejecting udp",
+ canDrop(LogicalInverseRewriter.rewrite(not(userDefined(fake, nullRejecting))), ccmd, dictionaries));
+ }
+
+
+ private static final class InInt32UDP extends UserDefinedPredicate<Integer> implements Serializable {
+
+ private final Set<Integer> ints;
+
+ InInt32UDP(Set<Integer> ints) {
+ this.ints = ints;
+ }
+
+ @Override
+ public boolean keep(Integer value) {
+ return ints.contains(value);
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Integer> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Integer> statistics) {
+ return false;
+ }
+ }
+
private static double toDouble(int value) {
return (value * 1.0);
}