You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2014/07/29 23:39:17 UTC
[3/4] Add a unified and optionally more constrained API for
expressing filters on columns
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
new file mode 100644
index 0000000..4cdedf2
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
@@ -0,0 +1,91 @@
+package parquet.filter2.recordlevel;
+
+import parquet.column.Dictionary;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import parquet.io.api.Binary;
+import parquet.io.api.PrimitiveConverter;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * see {@link FilteringRecordMaterializer}
+ *
+ * This pass-through proxy for a delegate {@link PrimitiveConverter} also
+ * updates the {@link ValueInspector}s of a {@link IncrementallyUpdatedFilterPredicate}
+ */
+public class FilteringPrimitiveConverter extends PrimitiveConverter {
+ private final PrimitiveConverter delegate;
+ private final ValueInspector[] valueInspectors;
+
+ public FilteringPrimitiveConverter(PrimitiveConverter delegate, ValueInspector[] valueInspectors) {
+ this.delegate = checkNotNull(delegate, "delegate");
+ this.valueInspectors = checkNotNull(valueInspectors, "valueInspectors");
+ }
+
+ // TODO: this works, but
+ // TODO: essentially turns off the benefits of dictionary support
+ // TODO: even if the underlying delegate supports it.
+ // TODO: we should support it here. (https://issues.apache.org/jira/browse/PARQUET-36)
+ @Override
+ public boolean hasDictionarySupport() {
+ return false;
+ }
+
+ @Override
+ public void setDictionary(Dictionary dictionary) {
+ throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
+ }
+
+ @Override
+ public void addValueFromDictionary(int dictionaryId) {
+ throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addBinary(value);
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addBoolean(value);
+ }
+
+ @Override
+ public void addDouble(double value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addDouble(value);
+ }
+
+ @Override
+ public void addFloat(float value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addFloat(value);
+ }
+
+ @Override
+ public void addInt(int value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addInt(value);
+ }
+
+ @Override
+ public void addLong(long value) {
+ for (ValueInspector valueInspector : valueInspectors) {
+ valueInspector.update(value);
+ }
+ delegate.addLong(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
new file mode 100644
index 0000000..41dd5d3
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
@@ -0,0 +1,97 @@
+package parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import parquet.io.PrimitiveColumnIO;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * A pass-through proxy for a {@link RecordMaterializer} that updates a {@link IncrementallyUpdatedFilterPredicate}
+ * as it receives concrete values for the current record. If, after the record assembly signals that
+ * there are no more values, the predicate indicates that this record should be dropped, {@link #getCurrentRecord()}
+ * returns null to signal that this record is being skipped.
+ * Otherwise, the record is retrieved from the delegate.
+ */
+public class FilteringRecordMaterializer<T> extends RecordMaterializer<T> {
+ // the real record materializer
+ private final RecordMaterializer<T> delegate;
+
+ // the proxied root converter
+ private final FilteringGroupConverter rootConverter;
+
+ // the predicate
+ private final IncrementallyUpdatedFilterPredicate filterPredicate;
+
+ public FilteringRecordMaterializer(
+ RecordMaterializer<T> delegate,
+ List<PrimitiveColumnIO> columnIOs,
+ Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn,
+ IncrementallyUpdatedFilterPredicate filterPredicate) {
+
+ checkNotNull(columnIOs, "columnIOs");
+ checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn");
+ this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+ this.delegate = checkNotNull(delegate, "delegate");
+
+ // keep track of which path of indices leads to which primitive column
+ Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath = new HashMap<List<Integer>, PrimitiveColumnIO>();
+
+ for (PrimitiveColumnIO c : columnIOs) {
+ columnIOsByIndexFieldPath.put(getIndexFieldPathList(c), c);
+ }
+
+ // create a proxy for the delegate's root converter
+ this.rootConverter = new FilteringGroupConverter(
+ delegate.getRootConverter(), Collections.<Integer>emptyList(), valueInspectorsByColumn, columnIOsByIndexFieldPath);
+ }
+
+ public static List<Integer> getIndexFieldPathList(PrimitiveColumnIO c) {
+ return intArrayToList(c.getIndexFieldPath());
+ }
+
+ public static List<Integer> intArrayToList(int[] arr) {
+ List<Integer> list = new ArrayList<Integer>(arr.length);
+ for (int i : arr) {
+ list.add(i);
+ }
+ return list;
+ }
+
+
+
+ @Override
+ public T getCurrentRecord() {
+
+ // find out if the predicate thinks we should keep this record
+ boolean keep = IncrementallyUpdatedFilterPredicateEvaluator.evaluate(filterPredicate);
+
+ // reset the stateful predicate no matter what
+ IncrementallyUpdatedFilterPredicateResetter.reset(filterPredicate);
+
+ if (keep) {
+ return delegate.getCurrentRecord();
+ } else {
+ // signals a skip
+ return null;
+ }
+ }
+
+ @Override
+ public void skipCurrentRecord() {
+ delegate.skipCurrentRecord();
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return rootConverter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
new file mode 100644
index 0000000..457f0c9
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
@@ -0,0 +1,139 @@
+package parquet.filter2.recordlevel;
+
+import parquet.io.api.Binary;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * A rewritten version of a {@link parquet.filter2.predicate.FilterPredicate} which receives
+ * the values for a record's columns one by one and internally tracks whether the predicate is
+ * satisfied, unsatisfied, or unknown.
+ *
+ * This is used to apply a predicate during record assembly, without assembling a second copy of
+ * a record, and without building a stack of update events.
+ *
+ * IncrementallyUpdatedFilterPredicate is implemented via the visitor pattern, as is
+ * {@link parquet.filter2.predicate.FilterPredicate}
+ */
+public interface IncrementallyUpdatedFilterPredicate {
+
+ /**
+ * A Visitor for an {@link IncrementallyUpdatedFilterPredicate}, per the visitor pattern.
+ */
+ public static interface Visitor {
+ boolean visit(ValueInspector p);
+ boolean visit(And and);
+ boolean visit(Or or);
+ }
+
+ /**
+ * A {@link IncrementallyUpdatedFilterPredicate} must accept a {@link Visitor}, per the visitor pattern.
+ */
+ boolean accept(Visitor visitor);
+
+ /**
+ * This is the leaf node of a filter predicate. It receives the value for the primitive column it represents,
+ * and decides whether or not the predicate represented by this node is satisfied.
+ *
+ * It is stateful, and needs to be rest after use.
+ */
+ public static abstract class ValueInspector implements IncrementallyUpdatedFilterPredicate {
+ // package private constructor
+ ValueInspector() { }
+
+ private boolean result = false;
+ private boolean isKnown = false;
+
+ // these methods signal what the value is
+ public void updateNull() { throw new UnsupportedOperationException(); }
+ public void update(int value) { throw new UnsupportedOperationException(); }
+ public void update(long value) { throw new UnsupportedOperationException(); }
+ public void update(double value) { throw new UnsupportedOperationException(); }
+ public void update(float value) { throw new UnsupportedOperationException(); }
+ public void update(boolean value) { throw new UnsupportedOperationException(); }
+ public void update(Binary value) { throw new UnsupportedOperationException(); }
+
+ /**
+ * Reset to clear state and begin evaluating the next record.
+ */
+ public final void reset() {
+ isKnown = false;
+ result = false;
+ }
+
+ /**
+ * Subclasses should call this method to signal that the result of this predicate is known.
+ */
+ protected final void setResult(boolean result) {
+ if (isKnown) {
+ throw new IllegalStateException("setResult() called on a ValueInspector whose result is already known!"
+ + " Did you forget to call reset()?");
+ }
+ this.result = result;
+ this.isKnown = true;
+ }
+
+ /**
+ * Should only be called if {@link #isKnown} return true.
+ */
+ public final boolean getResult() {
+ if (!isKnown) {
+ throw new IllegalStateException("getResult() called on a ValueInspector whose result is not yet known!");
+ }
+ return result;
+ }
+
+ /**
+ * Return true if this inspector has received a value yet, false otherwise.
+ */
+ public final boolean isKnown() {
+ return isKnown;
+ }
+
+ @Override
+ public boolean accept(Visitor visitor) {
+ return visitor.visit(this);
+ }
+ }
+
+ // base class for and / or
+ static abstract class BinaryLogical implements IncrementallyUpdatedFilterPredicate {
+ private final IncrementallyUpdatedFilterPredicate left;
+ private final IncrementallyUpdatedFilterPredicate right;
+
+ BinaryLogical(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+ this.left = checkNotNull(left, "left");
+ this.right = checkNotNull(right, "right");
+ }
+
+ public final IncrementallyUpdatedFilterPredicate getLeft() {
+ return left;
+ }
+
+ public final IncrementallyUpdatedFilterPredicate getRight() {
+ return right;
+ }
+ }
+
+ public static final class Or extends BinaryLogical {
+ Or(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+ super(left, right);
+ }
+
+ @Override
+ public boolean accept(Visitor visitor) {
+ return visitor.visit(this);
+ }
+ }
+
+ public static final class And extends BinaryLogical {
+ And(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+ super(left, right);
+ }
+
+ @Override
+ public boolean accept(Visitor visitor) {
+ return visitor.visit(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
new file mode 100644
index 0000000..9481738
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
@@ -0,0 +1,79 @@
+package parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.FilterPredicate.Visitor;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static parquet.Preconditions.checkArgument;
+
+/**
+ * The implementation of this abstract class is auto-generated by
+ * {@link parquet.filter2.IncrementallyUpdatedFilterPredicateGenerator}
+ *
+ * Constructs a {@link IncrementallyUpdatedFilterPredicate} from a {@link parquet.filter2.predicate.FilterPredicate}
+ * This is how records are filtered during record assembly. The implementation is generated in order to avoid autoboxing.
+ *
+ * Note: the supplied predicate must not contain any instances of the not() operator as this is not
+ * supported by this filter.
+ *
+ * the supplied predicate should first be run through {@link parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
+ * in a form that doesn't make use of the not() operator.
+ *
+ * the supplied predicate should also have already been run through
+ * {@link parquet.filter2.predicate.SchemaCompatibilityValidator}
+ * to make sure it is compatible with the schema of this file.
+ *
+ * TODO: UserDefinedPredicates still autobox however
+ */
+public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor<IncrementallyUpdatedFilterPredicate> {
+ private boolean built = false;
+ private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn = new HashMap<ColumnPath, List<ValueInspector>>();
+
+ public IncrementallyUpdatedFilterPredicateBuilderBase() { }
+
+ public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) {
+ checkArgument(!built, "This builder has already been used");
+ IncrementallyUpdatedFilterPredicate incremental = pred.accept(this);
+ built = true;
+ return incremental;
+ }
+
+ protected final void addValueInspector(ColumnPath columnPath, ValueInspector valueInspector) {
+ List<ValueInspector> valueInspectors = valueInspectorsByColumn.get(columnPath);
+ if (valueInspectors == null) {
+ valueInspectors = new ArrayList<ValueInspector>();
+ valueInspectorsByColumn.put(columnPath, valueInspectors);
+ }
+ valueInspectors.add(valueInspector);
+ }
+
+ public Map<ColumnPath, List<ValueInspector>> getValueInspectorsByColumn() {
+ return valueInspectorsByColumn;
+ }
+
+ @Override
+ public final IncrementallyUpdatedFilterPredicate visit(And and) {
+ return new IncrementallyUpdatedFilterPredicate.And(and.getLeft().accept(this), and.getRight().accept(this));
+ }
+
+ @Override
+ public final IncrementallyUpdatedFilterPredicate visit(Or or) {
+ return new IncrementallyUpdatedFilterPredicate.Or(or.getLeft().accept(this), or.getRight().accept(this));
+ }
+
+ @Override
+ public final IncrementallyUpdatedFilterPredicate visit(Not not) {
+ throw new IllegalArgumentException(
+ "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
new file mode 100644
index 0000000..7536d8e
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
@@ -0,0 +1,45 @@
+package parquet.filter2.recordlevel;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Determines whether an {@link IncrementallyUpdatedFilterPredicate} is satisfied or not.
+ * This implementation makes the assumption that all {@link ValueInspector}s in an unknown state
+ * represent columns with a null value, and updates them accordingly.
+ *
+ * TODO: We could also build an evaluator that detects if enough values are known to determine the outcome
+ * TODO: of the predicate and quit the record assembly early. (https://issues.apache.org/jira/browse/PARQUET-37)
+ */
+public class IncrementallyUpdatedFilterPredicateEvaluator implements Visitor {
+ private static final IncrementallyUpdatedFilterPredicateEvaluator INSTANCE = new IncrementallyUpdatedFilterPredicateEvaluator();
+
+ public static boolean evaluate(IncrementallyUpdatedFilterPredicate pred) {
+ checkNotNull(pred, "pred");
+ return pred.accept(INSTANCE);
+ }
+
+ private IncrementallyUpdatedFilterPredicateEvaluator() {}
+
+ @Override
+ public boolean visit(ValueInspector p) {
+ if (!p.isKnown()) {
+ p.updateNull();
+ }
+ return p.getResult();
+ }
+
+ @Override
+ public boolean visit(And and) {
+ return and.getLeft().accept(this) && and.getRight().accept(this);
+ }
+
+ @Override
+ public boolean visit(Or or) {
+ return or.getLeft().accept(this) || or.getRight().accept(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
new file mode 100644
index 0000000..c75ef45
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
@@ -0,0 +1,42 @@
+package parquet.filter2.recordlevel;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Resets all the {@link ValueInspector}s in a {@link IncrementallyUpdatedFilterPredicate}.
+ */
+public final class IncrementallyUpdatedFilterPredicateResetter implements Visitor {
+ private static final IncrementallyUpdatedFilterPredicateResetter INSTANCE = new IncrementallyUpdatedFilterPredicateResetter();
+
+ public static void reset(IncrementallyUpdatedFilterPredicate pred) {
+ checkNotNull(pred, "pred");
+ pred.accept(INSTANCE);
+ }
+
+ private IncrementallyUpdatedFilterPredicateResetter() { }
+
+ @Override
+ public boolean visit(ValueInspector p) {
+ p.reset();
+ return false;
+ }
+
+ @Override
+ public boolean visit(And and) {
+ and.getLeft().accept(this);
+ and.getRight().accept(this);
+ return false;
+ }
+
+ @Override
+ public boolean visit(Or or) {
+ or.getLeft().accept(this);
+ or.getRight().accept(this);
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
index b6239cb..a1a51c2 100644
--- a/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
+++ b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
@@ -62,6 +62,12 @@ class FilteredRecordReader<T> extends RecordReaderImplementation<T> {
return super.read();
}
+ // FilteredRecordReader skips forwards itself, it never asks the layer above to do the skipping for it.
+ // This is different from how filtering is handled in the filter2 API
+ @Override
+ public boolean shouldSkipCurrentRecord() {
+ return false;
+ }
/**
* Skips forwards until the filter finds the first match. Returns false
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
index c1ffbe6..bc048b0 100644
--- a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
@@ -25,11 +25,23 @@ import parquet.column.ColumnWriter;
import parquet.column.impl.ColumnReadStoreImpl;
import parquet.column.page.PageReadStore;
import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import parquet.filter2.compat.FilterCompat.NoOpFilter;
+import parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
+import parquet.filter2.compat.FilterCompat.Visitor;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.recordlevel.FilteringRecordMaterializer;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicateBuilder;
import parquet.io.api.Binary;
import parquet.io.api.RecordConsumer;
import parquet.io.api.RecordMaterializer;
import parquet.schema.MessageType;
+import static parquet.Preconditions.checkNotNull;
+
/**
* Message level of the IO structure
*
@@ -55,32 +67,74 @@ public class MessageColumnIO extends GroupColumnIO {
return super.getColumnNames();
}
- public <T> RecordReader<T> getRecordReader(PageReadStore columns, RecordMaterializer<T> recordMaterializer) {
- if (leaves.size() > 0) {
- return new RecordReaderImplementation<T>(
- this,
- recordMaterializer,
- validating,
- new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType())
- );
- } else {
+ public <T> RecordReader<T> getRecordReader(PageReadStore columns,
+ RecordMaterializer<T> recordMaterializer) {
+ return getRecordReader(columns, recordMaterializer, FilterCompat.NOOP);
+ }
+
+ /**
+ * @deprecated use {@link #getRecordReader(PageReadStore, RecordMaterializer, Filter)}
+ */
+ @Deprecated
+ public <T> RecordReader<T> getRecordReader(PageReadStore columns,
+ RecordMaterializer<T> recordMaterializer,
+ UnboundRecordFilter filter) {
+ return getRecordReader(columns, recordMaterializer, FilterCompat.get(filter));
+ }
+
+ public <T> RecordReader<T> getRecordReader(final PageReadStore columns,
+ final RecordMaterializer<T> recordMaterializer,
+ final Filter filter) {
+ checkNotNull(columns, "columns");
+ checkNotNull(recordMaterializer, "recordMaterializer");
+ checkNotNull(filter, "filter");
+
+ if (leaves.isEmpty()) {
return new EmptyRecordReader<T>(recordMaterializer);
}
- }
- public <T> RecordReader<T> getRecordReader(PageReadStore columns, RecordMaterializer<T> recordMaterializer,
- UnboundRecordFilter unboundFilter) {
-
- return (unboundFilter == null)
- ? getRecordReader(columns, recordMaterializer)
- : new FilteredRecordReader<T>(
- this,
- recordMaterializer,
- validating,
- new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()),
- unboundFilter,
- columns.getRowCount()
- );
+ return filter.accept(new Visitor<RecordReader<T>>() {
+ @Override
+ public RecordReader<T> visit(FilterPredicateCompat filterPredicateCompat) {
+
+ FilterPredicate predicate = filterPredicateCompat.getFilterPredicate();
+ IncrementallyUpdatedFilterPredicateBuilder builder = new IncrementallyUpdatedFilterPredicateBuilder();
+ IncrementallyUpdatedFilterPredicate streamingPredicate = builder.build(predicate);
+ RecordMaterializer<T> filteringRecordMaterializer = new FilteringRecordMaterializer<T>(
+ recordMaterializer,
+ leaves,
+ builder.getValueInspectorsByColumn(),
+ streamingPredicate);
+
+ return new RecordReaderImplementation<T>(
+ MessageColumnIO.this,
+ filteringRecordMaterializer,
+ validating,
+ new ColumnReadStoreImpl(columns, filteringRecordMaterializer.getRootConverter(), getType()));
+ }
+
+ @Override
+ public RecordReader<T> visit(UnboundRecordFilterCompat unboundRecordFilterCompat) {
+ return new FilteredRecordReader<T>(
+ MessageColumnIO.this,
+ recordMaterializer,
+ validating,
+ new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()),
+ unboundRecordFilterCompat.getUnboundRecordFilter(),
+ columns.getRowCount()
+ );
+
+ }
+
+ @Override
+ public RecordReader<T> visit(NoOpFilter noOpFilter) {
+ return new RecordReaderImplementation<T>(
+ MessageColumnIO.this,
+ recordMaterializer,
+ validating,
+ new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()));
+ }
+ });
}
private class MessageColumnIORecordConsumer extends RecordConsumer {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/RecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/RecordReader.java b/parquet-column/src/main/java/parquet/io/RecordReader.java
index f01b02a..e0cfeb6 100644
--- a/parquet-column/src/main/java/parquet/io/RecordReader.java
+++ b/parquet-column/src/main/java/parquet/io/RecordReader.java
@@ -25,9 +25,16 @@ package parquet.io;
public abstract class RecordReader<T> {
/**
- * reads one record and returns it
+ * Reads one record and returns it.
* @return the materialized record
*/
public abstract T read();
+ /**
+ * Returns whether the current record should be skipped (dropped)
+ * Will be called *after* read()
+ */
+ public boolean shouldSkipCurrentRecord() {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
index c5d7da7..247900e 100644
--- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
+++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
@@ -234,6 +234,8 @@ class RecordReaderImplementation<T> extends RecordReader<T> {
private State[] states;
private ColumnReader[] columnReaders;
+ private boolean shouldSkipCurrentRecord = false;
+
/**
* @param root the root of the schema
* @param recordMaterializer responsible of materializing the records
@@ -411,7 +413,17 @@ class RecordReaderImplementation<T> extends RecordReader<T> {
currentState = currentState.nextState[nextR];
} while (currentState != null);
recordRootConverter.end();
- return recordMaterializer.getCurrentRecord();
+ T record = recordMaterializer.getCurrentRecord();
+ shouldSkipCurrentRecord = record == null;
+ if (shouldSkipCurrentRecord) {
+ recordMaterializer.skipCurrentRecord();
+ }
+ return record;
+ }
+
+ @Override
+ public boolean shouldSkipCurrentRecord() {
+ return shouldSkipCurrentRecord;
}
private static void log(String string) {
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/api/Binary.java b/parquet-column/src/main/java/parquet/io/api/Binary.java
index 1ef23fb..432f075 100644
--- a/parquet-column/src/main/java/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/parquet/io/api/Binary.java
@@ -15,11 +15,11 @@
*/
package parquet.io.api;
-import static parquet.bytes.BytesUtils.UTF8;
-
import java.io.DataOutput;
import java.io.IOException;
+import java.io.ObjectStreamException;
import java.io.OutputStream;
+import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -27,230 +27,325 @@ import java.util.Arrays;
import parquet.bytes.BytesUtils;
import parquet.io.ParquetEncodingException;
-abstract public class Binary {
+import static parquet.bytes.BytesUtils.UTF8;
+
+abstract public class Binary implements Comparable<Binary>, Serializable {
+
+ // this isn't really something others should extend
+ private Binary() { }
public static final Binary EMPTY = fromByteArray(new byte[0]);
- public static Binary fromByteArray(
- final byte[] value,
- final int offset,
- final int length) {
-
- return new Binary() {
- @Override
- public String toStringUsingUTF8() {
- return UTF8.decode(ByteBuffer.wrap(value, offset, length)).toString();
- // TODO: figure out why the following line was much slower
- // rdb: new String(...) is slower because it instantiates a new Decoder,
- // while Charset#decode uses a thread-local decoder cache
- // return new String(value, offset, length, BytesUtils.UTF8);
- }
+ abstract public String toStringUsingUTF8();
- @Override
- public int length() {
- return length;
- }
+ abstract public int length();
- @Override
- public void writeTo(OutputStream out) throws IOException {
- out.write(value, offset, length);
- }
+ abstract public void writeTo(OutputStream out) throws IOException;
- @Override
- public byte[] getBytes() {
- return Arrays.copyOfRange(value, offset, offset + length);
- }
+ abstract public void writeTo(DataOutput out) throws IOException;
- @Override
- public int hashCode() {
- return Binary.hashCode(value, offset, length);
- }
+ abstract public byte[] getBytes();
- @Override
- boolean equals(Binary other) {
- return other.equals(value, offset, length);
- }
+ abstract boolean equals(byte[] bytes, int offset, int length);
- @Override
- boolean equals(byte[] other, int otherOffset, int otherLength) {
- return Binary.equals(value, offset, length, other, otherOffset, otherLength);
- }
+ abstract boolean equals(Binary other);
- @Override
- public int compareTo(Binary other) {
- return other.compareTo(value, offset, length);
- }
+ abstract public int compareTo(Binary other);
- @Override
- int compareTo(byte[] other, int otherOffset, int otherLength) {
- return Binary.compareTwoByteArrays(value, offset, length, other, otherOffset, otherLength);
- }
+ abstract int compareTo(byte[] bytes, int offset, int length);
- @Override
- public ByteBuffer toByteBuffer() {
- return ByteBuffer.wrap(value, offset, length);
- }
+ abstract public ByteBuffer toByteBuffer();
- @Override
- public void writeTo(DataOutput out) throws IOException {
- out.write(value, offset, length);
- }
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (obj instanceof Binary) {
+ return equals((Binary)obj);
+ }
+ return false;
+ }
- };
+ @Override
+ public String toString() {
+ return "Binary{" + length() + " bytes, " + Arrays.toString(getBytes()) + "}";
}
- public static Binary fromByteArray(final byte[] value) {
- return new Binary() {
- @Override
- public String toStringUsingUTF8() {
- return new String(value, BytesUtils.UTF8);
- }
+ private static class ByteArraySliceBackedBinary extends Binary {
+ private final byte[] value;
+ private final int offset;
+ private final int length;
- @Override
- public int length() {
- return value.length;
- }
+ public ByteArraySliceBackedBinary(byte[] value, int offset, int length) {
+ this.value = value;
+ this.offset = offset;
+ this.length = length;
+ }
- @Override
- public void writeTo(OutputStream out) throws IOException {
- out.write(value);
- }
+ @Override
+ public String toStringUsingUTF8() {
+ return UTF8.decode(ByteBuffer.wrap(value, offset, length)).toString();
+ // TODO: figure out why the following line was much slower
+ // rdb: new String(...) is slower because it instantiates a new Decoder,
+ // while Charset#decode uses a thread-local decoder cache
+ // return new String(value, offset, length, BytesUtils.UTF8);
+ }
- @Override
- public byte[] getBytes() {
- return value;
- }
+ @Override
+ public int length() {
+ return length;
+ }
- @Override
- public int hashCode() {
- return Binary.hashCode(value, 0, value.length);
- }
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(value, offset, length);
+ }
- @Override
- boolean equals(Binary other) {
- return other.equals(value, 0, value.length);
- }
+ @Override
+ public byte[] getBytes() {
+ return Arrays.copyOfRange(value, offset, offset + length);
+ }
- @Override
- boolean equals(byte[] other, int otherOffset, int otherLength) {
- return Binary.equals(value, 0, value.length, other, otherOffset, otherLength);
- }
+ @Override
+ public int hashCode() {
+ return Binary.hashCode(value, offset, length);
+ }
- @Override
- public int compareTo(Binary other) {
- return other.compareTo(value, 0, value.length);
- }
+ @Override
+ boolean equals(Binary other) {
+ return other.equals(value, offset, length);
+ }
- @Override
- int compareTo(byte[] other, int otherOffset, int otherLength) {
- return Binary.compareTwoByteArrays(value, 0, value.length, other, otherOffset, otherLength);
- }
+ @Override
+ boolean equals(byte[] other, int otherOffset, int otherLength) {
+ return Binary.equals(value, offset, length, other, otherOffset, otherLength);
+ }
- @Override
- public ByteBuffer toByteBuffer() {
- return ByteBuffer.wrap(value);
- }
+ @Override
+ public int compareTo(Binary other) {
+ return other.compareTo(value, offset, length);
+ }
+
+ @Override
+ int compareTo(byte[] other, int otherOffset, int otherLength) {
+ return Binary.compareTwoByteArrays(value, offset, length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() {
+ return ByteBuffer.wrap(value, offset, length);
+ }
+
+ @Override
+ public void writeTo(DataOutput out) throws IOException {
+ out.write(value, offset, length);
+ }
- @Override
- public void writeTo(DataOutput out) throws IOException {
- out.write(value);
- }
- };
}
- public static Binary fromByteBuffer(final ByteBuffer value) {
- return new Binary() {
- @Override
- public String toStringUsingUTF8() {
- return new String(getBytes(), BytesUtils.UTF8);
- }
+ private static class FromStringBinary extends ByteArrayBackedBinary {
+ public FromStringBinary(byte[] value) {
+ super(value);
+ }
- @Override
- public int length() {
- return value.remaining();
- }
+ @Override
+ public String toString() {
+ return "Binary{\"" + toStringUsingUTF8() + "\"}";
+ }
+ }
- @Override
- public void writeTo(OutputStream out) throws IOException {
- // TODO: should not have to materialize those bytes
- out.write(getBytes());
- }
+ public static Binary fromByteArray(final byte[] value, final int offset, final int length) {
+ return new ByteArraySliceBackedBinary(value, offset, length);
+ }
- @Override
- public byte[] getBytes() {
- byte[] bytes = new byte[value.remaining()];
+ private static class ByteArrayBackedBinary extends Binary {
+ private final byte[] value;
- value.mark();
- value.get(bytes).reset();
- return bytes;
- }
+ public ByteArrayBackedBinary(byte[] value) {
+ this.value = value;
+ }
- @Override
- public int hashCode() {
- if (value.hasArray()) {
- return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining());
- }
- byte[] bytes = getBytes();
- return Binary.hashCode(bytes, 0, bytes.length);
- }
+ @Override
+ public String toStringUsingUTF8() {
+ return new String(value, BytesUtils.UTF8);
+ }
- @Override
- boolean equals(Binary other) {
- if (value.hasArray()) {
- return other.equals(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining());
- }
- byte[] bytes = getBytes();
- return other.equals(bytes, 0, bytes.length);
- }
+ @Override
+ public int length() {
+ return value.length;
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ out.write(value);
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return value;
+ }
- @Override
- boolean equals(byte[] other, int otherOffset, int otherLength) {
- if (value.hasArray()) {
- return Binary.equals(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
- }
- byte[] bytes = getBytes();
- return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
+ @Override
+ public int hashCode() {
+ return Binary.hashCode(value, 0, value.length);
+ }
+
+ @Override
+ boolean equals(Binary other) {
+ return other.equals(value, 0, value.length);
+ }
+
+ @Override
+ boolean equals(byte[] other, int otherOffset, int otherLength) {
+ return Binary.equals(value, 0, value.length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public int compareTo(Binary other) {
+ return other.compareTo(value, 0, value.length);
+ }
+
+ @Override
+ int compareTo(byte[] other, int otherOffset, int otherLength) {
+ return Binary.compareTwoByteArrays(value, 0, value.length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() {
+ return ByteBuffer.wrap(value);
+ }
+
+ @Override
+ public void writeTo(DataOutput out) throws IOException {
+ out.write(value);
+ }
+
+ }
+
+ public static Binary fromByteArray(final byte[] value) {
+ return new ByteArrayBackedBinary(value);
+ }
+
+ private static class ByteBufferBackedBinary extends Binary {
+ private transient ByteBuffer value;
+
+ public ByteBufferBackedBinary(ByteBuffer value) {
+ this.value = value;
+ }
+
+ @Override
+ public String toStringUsingUTF8() {
+ return new String(getBytes(), BytesUtils.UTF8);
+ }
+
+ @Override
+ public int length() {
+ return value.remaining();
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ // TODO: should not have to materialize those bytes
+ out.write(getBytes());
+ }
+
+ @Override
+ public byte[] getBytes() {
+ byte[] bytes = new byte[value.remaining()];
+
+ value.mark();
+ value.get(bytes).reset();
+ return bytes;
+ }
+
+ @Override
+ public int hashCode() {
+ if (value.hasArray()) {
+ return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining());
}
+ byte[] bytes = getBytes();
+ return Binary.hashCode(bytes, 0, bytes.length);
+ }
- @Override
- public int compareTo(Binary other) {
- if (value.hasArray()) {
- return other.compareTo(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining());
- }
- byte[] bytes = getBytes();
- return other.compareTo(bytes, 0, bytes.length);
+ @Override
+ boolean equals(Binary other) {
+ if (value.hasArray()) {
+ return other.equals(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining());
}
+ byte[] bytes = getBytes();
+ return other.equals(bytes, 0, bytes.length);
+ }
- @Override
- int compareTo(byte[] other, int otherOffset, int otherLength) {
- if (value.hasArray()) {
- return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
- value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
- }
- byte[] bytes = getBytes();
- return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
+ @Override
+ boolean equals(byte[] other, int otherOffset, int otherLength) {
+ if (value.hasArray()) {
+ return Binary.equals(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
}
+ byte[] bytes = getBytes();
+ return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
+ }
- @Override
- public ByteBuffer toByteBuffer() {
- return value;
+ @Override
+ public int compareTo(Binary other) {
+ if (value.hasArray()) {
+ return other.compareTo(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining());
}
+ byte[] bytes = getBytes();
+ return other.compareTo(bytes, 0, bytes.length);
+ }
- @Override
- public void writeTo(DataOutput out) throws IOException {
- // TODO: should not have to materialize those bytes
- out.write(getBytes());
+ @Override
+ int compareTo(byte[] other, int otherOffset, int otherLength) {
+ if (value.hasArray()) {
+ return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
+ value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
}
- };
+ byte[] bytes = getBytes();
+ return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
+ }
+
+ @Override
+ public ByteBuffer toByteBuffer() {
+ return value;
+ }
+
+ @Override
+ public void writeTo(DataOutput out) throws IOException {
+ // TODO: should not have to materialize those bytes
+ out.write(getBytes());
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+ byte[] bytes = getBytes();
+ out.writeInt(bytes.length);
+ out.write(bytes);
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ int length = in.readInt();
+ byte[] bytes = new byte[length];
+ in.readFully(bytes, 0, length);
+ this.value = ByteBuffer.wrap(bytes);
+ }
+
+ private void readObjectNoData() throws ObjectStreamException {
+ this.value = ByteBuffer.wrap(new byte[0]);
+ }
+
+ }
+
+ public static Binary fromByteBuffer(final ByteBuffer value) {
+ return new ByteBufferBackedBinary(value);
}
public static Binary fromString(final String value) {
try {
- return fromByteArray(value.getBytes("UTF-8"));
+ return new FromStringBinary(value.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
throw new ParquetEncodingException("UTF-8 not supported.", e);
}
@@ -313,39 +408,4 @@ abstract public class Binary {
else if (length1 < length2) { return 1;}
else { return -1; }
}
-
- abstract public String toStringUsingUTF8();
-
- abstract public int length();
-
- abstract public void writeTo(OutputStream out) throws IOException;
-
- abstract public void writeTo(DataOutput out) throws IOException;
-
- abstract public byte[] getBytes();
-
- abstract boolean equals(byte[] bytes, int offset, int length);
-
- abstract boolean equals(Binary other);
-
- abstract public int compareTo(Binary other);
-
- abstract int compareTo(byte[] bytes, int offset, int length);
-
- @Override
- public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
- if (obj instanceof Binary) {
- return equals((Binary)obj);
- }
- return false;
- }
-
- abstract public ByteBuffer toByteBuffer();
-
- public String toString() {
- return "Binary{" + length() + " bytes, " + Arrays.toString(getBytes()) + "}";
- };
}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java b/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java
index 7d90c6a..7ff0f1c 100644
--- a/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java
+++ b/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java
@@ -34,6 +34,11 @@ abstract public class RecordMaterializer<T> {
abstract public T getCurrentRecord();
/**
+ * Called if {@link #getCurrentRecord()} isn't going to be called.
+ */
+ public void skipCurrentRecord() { }
+
+ /**
* @return the root converter for this tree
*/
abstract public GroupConverter getRootConverter();
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java b/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java
new file mode 100644
index 0000000..277fa43
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java
@@ -0,0 +1,19 @@
+package parquet.filter2.predicate;
+
+public class DummyUdp extends UserDefinedPredicate<Integer> {
+
+ @Override
+ public boolean keep(Integer value) {
+ return false;
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Integer> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Integer> statistics) {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java
new file mode 100644
index 0000000..dafd7fd
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java
@@ -0,0 +1,103 @@
+package parquet.filter2.predicate;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.junit.Test;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.BinaryColumn;
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.Eq;
+import parquet.filter2.predicate.Operators.Gt;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.predicate.Operators.UserDefined;
+import parquet.io.api.Binary;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.binaryColumn;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+import static parquet.filter2.predicate.Operators.NotEq;
+
+public class TestFilterApiMethods {
+
+ private static final IntColumn intColumn = intColumn("a.b.c");
+ private static final DoubleColumn doubleColumn = doubleColumn("x.y.z");
+ private static final BinaryColumn binColumn = binaryColumn("a.string.column");
+
+ private static final FilterPredicate predicate =
+ and(not(or(eq(intColumn, 7), notEq(intColumn, 17))), gt(doubleColumn, 100.0));
+
+ @Test
+ public void testFilterPredicateCreation() {
+ FilterPredicate outerAnd = predicate;
+
+ assertTrue(outerAnd instanceof And);
+
+ FilterPredicate not = ((And) outerAnd).getLeft();
+ FilterPredicate gt = ((And) outerAnd).getRight();
+ assertTrue(not instanceof Not);
+
+ FilterPredicate or = ((Not) not).getPredicate();
+ assertTrue(or instanceof Or);
+
+ FilterPredicate leftEq = ((Or) or).getLeft();
+ FilterPredicate rightNotEq = ((Or) or).getRight();
+ assertTrue(leftEq instanceof Eq);
+ assertTrue(rightNotEq instanceof NotEq);
+ assertEquals(7, ((Eq) leftEq).getValue());
+ assertEquals(17, ((NotEq) rightNotEq).getValue());
+ assertEquals(ColumnPath.get("a", "b", "c"), ((Eq) leftEq).getColumn().getColumnPath());
+ assertEquals(ColumnPath.get("a", "b", "c"), ((NotEq) rightNotEq).getColumn().getColumnPath());
+
+ assertTrue(gt instanceof Gt);
+ assertEquals(100.0, ((Gt) gt).getValue());
+ assertEquals(ColumnPath.get("x", "y", "z"), ((Gt) gt).getColumn().getColumnPath());
+ }
+
+ @Test
+ public void testToString() {
+ FilterPredicate pred = or(predicate, notEq(binColumn, Binary.fromString("foobarbaz")));
+ assertEquals("or(and(not(or(eq(a.b.c, 7), noteq(a.b.c, 17))), gt(x.y.z, 100.0)), "
+ + "noteq(a.string.column, Binary{\"foobarbaz\"}))",
+ pred.toString());
+ }
+
+ @Test
+ public void testUdp() {
+ FilterPredicate predicate = or(eq(doubleColumn, 12.0), userDefined(intColumn, DummyUdp.class));
+ assertTrue(predicate instanceof Or);
+ FilterPredicate ud = ((Or) predicate).getRight();
+ assertTrue(ud instanceof UserDefined);
+ assertEquals(DummyUdp.class, ((UserDefined) ud).getUserDefinedPredicateClass());
+ assertTrue(((UserDefined) ud).getUserDefinedPredicate() instanceof DummyUdp);
+ }
+
+ @Test
+ public void testSerializable() throws Exception {
+ BinaryColumn binary = binaryColumn("foo");
+ FilterPredicate p = or(and(userDefined(intColumn, DummyUdp.class), predicate), eq(binary, Binary.fromString("hi")));
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(p);
+ oos.close();
+
+ ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+ FilterPredicate read = (FilterPredicate) is.readObject();
+ assertEquals(p, read);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java
new file mode 100644
index 0000000..0aa360b
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java
@@ -0,0 +1,85 @@
+package parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.UserDefined;
+
+import static org.junit.Assert.assertEquals;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.gtEq;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.lt;
+import static parquet.filter2.predicate.FilterApi.ltEq;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+import static parquet.filter2.predicate.LogicalInverseRewriter.rewrite;
+
+public class TestLogicalInverseRewriter {
+ private static final IntColumn intColumn = intColumn("a.b.c");
+ private static final DoubleColumn doubleColumn = doubleColumn("a.b.c");
+
+ private static final FilterPredicate complex =
+ and(
+ not(
+ or(ltEq(doubleColumn, 12.0),
+ and(
+ not(or(eq(intColumn, 7), notEq(intColumn, 17))),
+ userDefined(intColumn, DummyUdp.class)))),
+ or(gt(doubleColumn, 100.0), not(gtEq(intColumn, 77))));
+
+ private static final FilterPredicate complexCollapsed =
+ and(
+ and(gt(doubleColumn, 12.0),
+ or(
+ or(eq(intColumn, 7), notEq(intColumn, 17)),
+ new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class)))),
+ or(gt(doubleColumn, 100.0), lt(intColumn, 77)));
+
+ private static void assertNoOp(FilterPredicate p) {
+ assertEquals(p, rewrite(p));
+ }
+
+ @Test
+ public void testBaseCases() {
+ UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+
+ assertNoOp(eq(intColumn, 17));
+ assertNoOp(notEq(intColumn, 17));
+ assertNoOp(lt(intColumn, 17));
+ assertNoOp(ltEq(intColumn, 17));
+ assertNoOp(gt(intColumn, 17));
+ assertNoOp(gtEq(intColumn, 17));
+ assertNoOp(and(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+ assertNoOp(or(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+ assertNoOp(ud);
+
+ assertEquals(notEq(intColumn, 17), rewrite(not(eq(intColumn, 17))));
+ assertEquals(eq(intColumn, 17), rewrite(not(notEq(intColumn, 17))));
+ assertEquals(gtEq(intColumn, 17), rewrite(not(lt(intColumn, 17))));
+ assertEquals(gt(intColumn, 17), rewrite(not(ltEq(intColumn, 17))));
+ assertEquals(ltEq(intColumn, 17), rewrite(not(gt(intColumn, 17))));
+ assertEquals(lt(intColumn, 17), rewrite(not(gtEq(intColumn, 17))));
+ assertEquals(new LogicalNotUserDefined<Integer, DummyUdp>(ud), rewrite(not(ud)));
+
+ FilterPredicate notedAnd = not(and(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+ FilterPredicate distributedAnd = or(notEq(intColumn, 17), notEq(doubleColumn, 12.0));
+ assertEquals(distributedAnd, rewrite(notedAnd));
+
+ FilterPredicate andWithNots = and(not(gtEq(intColumn, 17)), lt(intColumn, 7));
+ FilterPredicate andWithoutNots = and(lt(intColumn, 17), lt(intColumn, 7));
+ assertEquals(andWithoutNots, rewrite(andWithNots));
+ }
+
+ @Test
+ public void testComplex() {
+ assertEquals(complexCollapsed, rewrite(complex));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java
new file mode 100644
index 0000000..19e6b68
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java
@@ -0,0 +1,76 @@
+package parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.UserDefined;
+
+import static org.junit.Assert.assertEquals;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.gtEq;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.lt;
+import static parquet.filter2.predicate.FilterApi.ltEq;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+import static parquet.filter2.predicate.LogicalInverter.invert;
+
+public class TestLogicalInverter {
+ private static final IntColumn intColumn = intColumn("a.b.c");
+ private static final DoubleColumn doubleColumn = doubleColumn("a.b.c");
+
+ private static final UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+
+ private static final FilterPredicate complex =
+ and(
+ or(ltEq(doubleColumn, 12.0),
+ and(
+ not(or(eq(intColumn, 7), notEq(intColumn, 17))),
+ userDefined(intColumn, DummyUdp.class))),
+ or(gt(doubleColumn, 100.0), notEq(intColumn, 77)));
+
+ private static final FilterPredicate complexInverse =
+ or(
+ and(gt(doubleColumn, 12.0),
+ or(
+ or(eq(intColumn, 7), notEq(intColumn, 17)),
+ new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class)))),
+ and(ltEq(doubleColumn, 100.0), eq(intColumn, 77)));
+
+ @Test
+ public void testBaseCases() {
+ assertEquals(notEq(intColumn, 17), invert(eq(intColumn, 17)));
+ assertEquals(eq(intColumn, 17), invert(notEq(intColumn, 17)));
+ assertEquals(gtEq(intColumn, 17), invert(lt(intColumn, 17)));
+ assertEquals(gt(intColumn, 17), invert(ltEq(intColumn, 17)));
+ assertEquals(ltEq(intColumn, 17), invert(gt(intColumn, 17)));
+ assertEquals(lt(intColumn, 17), invert(gtEq(intColumn, 17)));
+
+ FilterPredicate andPos = and(eq(intColumn, 17), eq(doubleColumn, 12.0));
+ FilterPredicate andInv = or(notEq(intColumn, 17), notEq(doubleColumn, 12.0));
+ assertEquals(andInv, invert(andPos));
+
+ FilterPredicate orPos = or(eq(intColumn, 17), eq(doubleColumn, 12.0));
+ FilterPredicate orInv = and(notEq(intColumn, 17), notEq(doubleColumn, 12.0));
+ assertEquals(orPos, invert(orInv));
+
+ assertEquals(eq(intColumn, 17), invert(not(eq(intColumn, 17))));
+
+ UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+ assertEquals(new LogicalNotUserDefined<Integer, DummyUdp>(ud), invert(ud));
+ assertEquals(ud, invert(not(ud)));
+ assertEquals(ud, invert(new LogicalNotUserDefined<Integer, DummyUdp>(ud)));
+ }
+
+ @Test
+ public void testComplex() {
+ assertEquals(complexInverse, invert(complex));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
new file mode 100644
index 0000000..e9e745f
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
@@ -0,0 +1,124 @@
+package parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import parquet.filter2.predicate.Operators.BinaryColumn;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.LongColumn;
+import parquet.io.api.Binary;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.binaryColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.longColumn;
+import static parquet.filter2.predicate.FilterApi.ltEq;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+import static parquet.filter2.predicate.SchemaCompatibilityValidator.validate;
+
+public class TestSchemaCompatibilityValidator {
+ private static final BinaryColumn stringC = binaryColumn("c");
+ private static final LongColumn longBar = longColumn("x.bar");
+ private static final IntColumn intBar = intColumn("x.bar");
+ private static final LongColumn lotsOfLongs = longColumn("lotsOfLongs");
+
+ private static final String schemaString =
+ "message Document {\n"
+ + " required int32 a;\n"
+ + " required binary b;\n"
+ + " required binary c (UTF8);\n"
+ + " required group x { required int32 bar; }\n"
+ + " repeated int64 lotsOfLongs;\n"
+ + "}\n";
+
+ private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString);
+
+ private static final FilterPredicate complexValid =
+ and(
+ or(ltEq(stringC, Binary.fromString("foo")),
+ and(
+ not(or(eq(intBar, 17), notEq(intBar, 17))),
+ userDefined(intBar, DummyUdp.class))),
+ or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));
+
+ static class LongDummyUdp extends UserDefinedPredicate<Long> {
+ @Override
+ public boolean keep(Long value) {
+ return false;
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Long> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Long> statistics) {
+ return false;
+ }
+ }
+
+ private static final FilterPredicate complexWrongType =
+ and(
+ or(ltEq(stringC, Binary.fromString("foo")),
+ and(
+ not(or(eq(longBar, 17L), notEq(longBar, 17L))),
+ userDefined(longBar, LongDummyUdp.class))),
+ or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));
+
+ private static final FilterPredicate complexMixedType =
+ and(
+ or(ltEq(stringC, Binary.fromString("foo")),
+ and(
+ not(or(eq(intBar, 17), notEq(longBar, 17L))),
+ userDefined(longBar, LongDummyUdp.class))),
+ or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));
+
+ @Test
+ public void testValidType() {
+ validate(complexValid, schema);
+ }
+
+ @Test
+ public void testFindsInvalidTypes() {
+ try {
+ validate(complexWrongType, schema);
+ fail("this should throw");
+ } catch (IllegalArgumentException e) {
+ assertEquals("FilterPredicate column: x.bar's declared type (java.lang.Long) does not match the schema found in file metadata. "
+ + "Column x.bar is of type: FullTypeDescriptor(PrimitiveType: INT32, OriginalType: null)\n"
+ + "Valid types for this column are: [class java.lang.Integer]", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testTwiceDeclaredColumn() {
+ validate(eq(stringC, Binary.fromString("larry")), schema);
+
+ try {
+ validate(complexMixedType, schema);
+ fail("this should throw");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Column: x.bar was provided with different types in the same predicate. Found both: (class java.lang.Integer, class java.lang.Long)", e.getMessage());
+ }
+
+ }
+
+ @Test
+ public void testRepeatedNotSupported() {
+ try {
+ validate(eq(lotsOfLongs, 10l), schema);
+ fail("this should throw");
+ } catch (IllegalArgumentException e) {
+ assertEquals("FilterPredicates do not currently support repeated columns. Column lotsOfLongs is repeated.", e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestValidTypeMap.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestValidTypeMap.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestValidTypeMap.java
new file mode 100644
index 0000000..07f2597
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestValidTypeMap.java
@@ -0,0 +1,93 @@
+package parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.Operators.BinaryColumn;
+import parquet.filter2.predicate.Operators.BooleanColumn;
+import parquet.filter2.predicate.Operators.Column;
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.FloatColumn;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.LongColumn;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static parquet.filter2.predicate.FilterApi.binaryColumn;
+import static parquet.filter2.predicate.FilterApi.booleanColumn;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.floatColumn;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.longColumn;
+import static parquet.filter2.predicate.ValidTypeMap.assertTypeValid;
+
+public class TestValidTypeMap {
+ public static IntColumn intColumn = intColumn("int.column");
+ public static LongColumn longColumn = longColumn("long.column");
+ public static FloatColumn floatColumn = floatColumn("float.column");
+ public static DoubleColumn doubleColumn = doubleColumn("double.column");
+ public static BooleanColumn booleanColumn = booleanColumn("boolean.column");
+ public static BinaryColumn binaryColumn = binaryColumn("binary.column");
+
+ private static class InvalidColumnType implements Comparable<InvalidColumnType> {
+ @Override
+ public int compareTo(InvalidColumnType o) {
+ return 0;
+ }
+ }
+
+ public static Column<InvalidColumnType> invalidColumn =
+ new Column<InvalidColumnType>(ColumnPath.get("invalid.column"), InvalidColumnType.class) { };
+
+ @Test
+ public void testValidTypes() {
+ assertTypeValid(intColumn, PrimitiveTypeName.INT32, null);
+ assertTypeValid(longColumn, PrimitiveTypeName.INT64, null);
+ assertTypeValid(floatColumn, PrimitiveTypeName.FLOAT, null);
+ assertTypeValid(doubleColumn, PrimitiveTypeName.DOUBLE, null);
+ assertTypeValid(booleanColumn, PrimitiveTypeName.BOOLEAN, null);
+ assertTypeValid(binaryColumn, PrimitiveTypeName.BINARY, null);
+ assertTypeValid(binaryColumn, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null);
+ assertTypeValid(binaryColumn, PrimitiveTypeName.BINARY, OriginalType.UTF8);
+ assertTypeValid(binaryColumn, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8);
+ }
+
+ @Test
+ public void testMismatchedTypes() {
+ try {
+ assertTypeValid(intColumn, PrimitiveTypeName.DOUBLE, null);
+ fail("This should throw!");
+ } catch (IllegalArgumentException e) {
+ assertEquals("FilterPredicate column: int.column's declared type (java.lang.Integer) does not match the "
+ + "schema found in file metadata. Column int.column is of type: "
+ + "FullTypeDescriptor(PrimitiveType: DOUBLE, OriginalType: null)\n"
+ + "Valid types for this column are: [class java.lang.Double]", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testUnsupportedType() {
+ try {
+ assertTypeValid(invalidColumn, PrimitiveTypeName.INT32, null);
+ fail("This should throw!");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Column invalid.column was declared as type: "
+ + "parquet.filter2.predicate.TestValidTypeMap$InvalidColumnType which is not supported "
+ + "in FilterPredicates. Supported types for this column are: [class java.lang.Integer]", e.getMessage());
+ }
+
+ try {
+ assertTypeValid(invalidColumn, PrimitiveTypeName.INT32, OriginalType.UTF8);
+ fail("This should throw!");
+ } catch (IllegalArgumentException e) {
+ assertEquals("Column invalid.column was declared as type: "
+ + "parquet.filter2.predicate.TestValidTypeMap$InvalidColumnType which is not supported "
+ + "in FilterPredicates. There are no supported types for columns of FullTypeDescriptor(PrimitiveType: INT32, OriginalType: UTF8)",
+ e.getMessage());
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java
new file mode 100644
index 0000000..08b7a04
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java
@@ -0,0 +1,191 @@
+package parquet.filter2.recordlevel;
+
+import org.junit.Test;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicateEvaluator.evaluate;
+
+public class TestIncrementallyUpdatedFilterPredicateEvaluator {
+
+ public static class ShortCircuitException extends RuntimeException {
+ public ShortCircuitException() {
+ super("this was supposed to short circuit and never get here!");
+ }
+ }
+
+ public static ValueInspector intIsNull() {
+ return new ValueInspector() {
+ @Override
+ public void updateNull() {
+ setResult(true);
+ }
+
+ @Override
+ public void update(int value) {
+ setResult(false);
+ }
+ };
+ }
+
+ public static ValueInspector intIsEven() {
+ return new ValueInspector() {
+ @Override
+ public void updateNull() {
+ setResult(false);
+ }
+
+ @Override
+ public void update(int value) {
+ setResult(value % 2 == 0);
+ }
+ };
+ }
+
+ public static ValueInspector doubleMoreThan10() {
+ return new ValueInspector() {
+ @Override
+ public void updateNull() {
+ setResult(false);
+ }
+
+ @Override
+ public void update(double value) {
+ setResult(value > 10.0);
+ }
+ };
+ }
+
+ @Test
+ public void testValueInspector() {
+ // known, and set to false criteria, null considered false
+ ValueInspector v = intIsEven();
+ v.update(11);
+ assertFalse(evaluate(v));
+ v.reset();
+
+ // known and set to true criteria, null considered false
+ v.update(12);
+ assertTrue(evaluate(v));
+ v.reset();
+
+ // known and set to null, null considered false
+ v.updateNull();
+ assertFalse(evaluate(v));
+ v.reset();
+
+ // known, and set to false criteria, null considered true
+ ValueInspector intIsNull = intIsNull();
+ intIsNull.update(10);
+ assertFalse(evaluate(intIsNull));
+ intIsNull.reset();
+
+ // known, and set to false criteria, null considered true
+ intIsNull.updateNull();
+ assertTrue(evaluate(intIsNull));
+ intIsNull.reset();
+
+ // unknown, null considered false
+ v.reset();
+ assertFalse(evaluate(v));
+
+ // unknown, null considered true
+ intIsNull.reset();
+ assertTrue(evaluate(intIsNull));
+ }
+
+ private void doOrTest(ValueInspector v1, ValueInspector v2, int v1Value, int v2Value, boolean expected) {
+ v1.update(v1Value);
+ v2.update(v2Value);
+ IncrementallyUpdatedFilterPredicate or = new Or(v1, v2);
+ assertEquals(expected, evaluate(or));
+ v1.reset();
+ v2.reset();
+ }
+
+ private void doAndTest(ValueInspector v1, ValueInspector v2, int v1Value, int v2Value, boolean expected) {
+ v1.update(v1Value);
+ v2.update(v2Value);
+ IncrementallyUpdatedFilterPredicate and = new And(v1, v2);
+ assertEquals(expected, evaluate(and));
+ v1.reset();
+ v2.reset();
+ }
+
+
+ @Test
+ public void testOr() {
+ ValueInspector v1 = intIsEven();
+ ValueInspector v2 = intIsEven();
+
+ int F = 11;
+ int T = 12;
+
+ // F || F == F
+ doOrTest(v1, v2, F, F, false);
+ // F || T == T
+ doOrTest(v1, v2, F, T, true);
+ // T || F == T
+ doOrTest(v1, v2, T, F, true);
+ // T || T == T
+ doOrTest(v1, v2, T, T, true);
+
+ }
+
+ @Test
+ public void testAnd() {
+ ValueInspector v1 = intIsEven();
+ ValueInspector v2 = intIsEven();
+
+ int F = 11;
+ int T = 12;
+
+ // F && F == F
+ doAndTest(v1, v2, F, F, false);
+ // F && T == F
+ doAndTest(v1, v2, F, T, false);
+ // T && F == F
+ doAndTest(v1, v2, T, F, false);
+ // T && T == T
+ doAndTest(v1, v2, T, T, true);
+
+ }
+
+ @Test
+ public void testShortCircuit() {
+ ValueInspector neverCalled = new ValueInspector() {
+ @Override
+ public boolean accept(Visitor visitor) {
+ throw new ShortCircuitException();
+ }
+ };
+
+ try {
+ evaluate(neverCalled);
+ fail("this should throw");
+ } catch (ShortCircuitException e) {
+ //
+ }
+
+ // T || X should evaluate to true without inspecting X
+ ValueInspector v = intIsEven();
+ v.update(10);
+ IncrementallyUpdatedFilterPredicate or = new Or(v, neverCalled);
+ assertTrue(evaluate(or));
+ v.reset();
+
+ // F && X should evaluate to false without inspecting X
+ v.update(11);
+ IncrementallyUpdatedFilterPredicate and = new And(v, neverCalled);
+ assertFalse(evaluate(and));
+ v.reset();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java
new file mode 100644
index 0000000..974d6e7
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java
@@ -0,0 +1,51 @@
+package parquet.filter2.recordlevel;
+
+
+import org.junit.Test;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.doubleMoreThan10;
+import static parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.intIsEven;
+import static parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.intIsNull;
+
+public class TestIncrementallyUpdatedFilterPredicateResetter {
+ @Test
+ public void testReset() {
+
+ ValueInspector intIsNull = intIsNull();
+ ValueInspector intIsEven = intIsEven();
+ ValueInspector doubleMoreThan10 = doubleMoreThan10();
+
+ IncrementallyUpdatedFilterPredicate pred = new Or(intIsNull, new And(intIsEven, doubleMoreThan10));
+
+ intIsNull.updateNull();
+ intIsEven.update(11);
+ doubleMoreThan10.update(20.0D);
+
+ assertTrue(intIsNull.isKnown());
+ assertTrue(intIsEven.isKnown());
+ assertTrue(doubleMoreThan10.isKnown());
+
+ IncrementallyUpdatedFilterPredicateResetter.reset(pred);
+
+ assertFalse(intIsNull.isKnown());
+ assertFalse(intIsEven.isKnown());
+ assertFalse(doubleMoreThan10.isKnown());
+
+ intIsNull.updateNull();
+ assertTrue(intIsNull.isKnown());
+ assertFalse(intIsEven.isKnown());
+ assertFalse(doubleMoreThan10.isKnown());
+
+ IncrementallyUpdatedFilterPredicateResetter.reset(pred);
+ assertFalse(intIsNull.isKnown());
+ assertFalse(intIsEven.isKnown());
+ assertFalse(doubleMoreThan10.isKnown());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/recordlevel/TestValueInspector.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/recordlevel/TestValueInspector.java b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestValueInspector.java
new file mode 100644
index 0000000..fc2e587
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestValueInspector.java
@@ -0,0 +1,79 @@
+package parquet.filter2.recordlevel;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.intIsEven;
+
+public class TestValueInspector {
+
+ @Test
+ public void testLifeCycle() {
+ ValueInspector v = intIsEven();
+
+ // begins in unknown state
+ assertFalse(v.isKnown());
+ // calling getResult in unknown state throws
+ try {
+ v.getResult();
+ fail("this should throw");
+ } catch (IllegalStateException e) {
+ assertEquals("getResult() called on a ValueInspector whose result is not yet known!", e.getMessage());
+ }
+
+ // update state to known
+ v.update(10);
+
+ // v was updated with value 10, so result is known and should be true
+ assertTrue(v.isKnown());
+ assertTrue(v.getResult());
+
+ // calling update w/o resetting should throw
+ try {
+ v.update(11);
+ fail("this should throw");
+ } catch (IllegalStateException e) {
+ assertEquals("setResult() called on a ValueInspector whose result is already known!"
+ + " Did you forget to call reset()?", e.getMessage());
+ }
+
+ // back to unknown state
+ v.reset();
+
+ assertFalse(v.isKnown());
+ // calling getResult in unknown state throws
+ try {
+ v.getResult();
+ fail("this should throw");
+ } catch (IllegalStateException e) {
+ assertEquals("getResult() called on a ValueInspector whose result is not yet known!", e.getMessage());
+ }
+
+ // v was updated with value 11, so result is known and should be false
+ v.update(11);
+ assertTrue(v.isKnown());
+ assertFalse(v.getResult());
+
+ }
+
+ @Test
+ public void testReusable() {
+ List<Integer> values = Arrays.asList(2, 4, 7, 3, 8, 8, 11, 200);
+ ValueInspector v = intIsEven();
+
+ for (Integer x : values) {
+ v.update(x);
+ assertEquals(x % 2 == 0, v.getResult());
+ v.reset();
+ }
+
+ }
+}