You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2014/07/29 23:39:17 UTC

[3/4] Add a unified and optionally more constrained API for expressing filters on columns

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
new file mode 100644
index 0000000..4cdedf2
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java
@@ -0,0 +1,91 @@
+package parquet.filter2.recordlevel;
+
+import parquet.column.Dictionary;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import parquet.io.api.Binary;
+import parquet.io.api.PrimitiveConverter;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * see {@link FilteringRecordMaterializer}
+ *
+ * This pass-through proxy for a delegate {@link PrimitiveConverter} also
+ * updates the {@link ValueInspector}s of a {@link IncrementallyUpdatedFilterPredicate}
+ */
+public class FilteringPrimitiveConverter extends PrimitiveConverter {
+  private final PrimitiveConverter delegate;
+  private final ValueInspector[] valueInspectors;
+
+  public FilteringPrimitiveConverter(PrimitiveConverter delegate, ValueInspector[] valueInspectors) {
+    this.delegate = checkNotNull(delegate, "delegate");
+    this.valueInspectors = checkNotNull(valueInspectors, "valueInspectors");
+  }
+
+  // TODO: this works, but
+  // TODO: essentially turns off the benefits of dictionary support
+  // TODO: even if the underlying delegate supports it.
+  // TODO: we should support it here. (https://issues.apache.org/jira/browse/PARQUET-36)
+  @Override
+  public boolean hasDictionarySupport() {
+    return false;
+  }
+
+  @Override
+  public void setDictionary(Dictionary dictionary) {
+    throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
+  }
+
+  @Override
+  public void addValueFromDictionary(int dictionaryId) {
+    throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support");
+  }
+
+  @Override
+  public void addBinary(Binary value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addBinary(value);
+  }
+
+  @Override
+  public void addBoolean(boolean value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addBoolean(value);
+  }
+
+  @Override
+  public void addDouble(double value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addDouble(value);
+  }
+
+  @Override
+  public void addFloat(float value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addFloat(value);
+  }
+
+  @Override
+  public void addInt(int value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addInt(value);
+  }
+
+  @Override
+  public void addLong(long value) {
+    for (ValueInspector valueInspector : valueInspectors) {
+      valueInspector.update(value);
+    }
+    delegate.addLong(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
new file mode 100644
index 0000000..41dd5d3
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringRecordMaterializer.java
@@ -0,0 +1,97 @@
+package parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import parquet.io.PrimitiveColumnIO;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * A pass-through proxy for a {@link RecordMaterializer} that updates a {@link IncrementallyUpdatedFilterPredicate}
+ * as it receives concrete values for the current record. If, after the record assembly signals that
+ * there are no more values, the predicate indicates that this record should be dropped, {@link #getCurrentRecord()}
+ * returns null to signal that this record is being skipped.
+ * Otherwise, the record is retrieved from the delegate.
+ */
+public class FilteringRecordMaterializer<T> extends RecordMaterializer<T> {
+  // the real record materializer
+  private final RecordMaterializer<T> delegate;
+
+  // the proxied root converter
+  private final FilteringGroupConverter rootConverter;
+
+  // the predicate
+  private final IncrementallyUpdatedFilterPredicate filterPredicate;
+
+  public FilteringRecordMaterializer(
+      RecordMaterializer<T> delegate,
+      List<PrimitiveColumnIO> columnIOs,
+      Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn,
+      IncrementallyUpdatedFilterPredicate filterPredicate) {
+
+    checkNotNull(columnIOs, "columnIOs");
+    checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn");
+    this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+    this.delegate = checkNotNull(delegate, "delegate");
+
+    // keep track of which path of indices leads to which primitive column
+    Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath = new HashMap<List<Integer>, PrimitiveColumnIO>();
+
+    for (PrimitiveColumnIO c : columnIOs) {
+      columnIOsByIndexFieldPath.put(getIndexFieldPathList(c), c);
+    }
+
+    // create a proxy for the delegate's root converter
+    this.rootConverter = new FilteringGroupConverter(
+        delegate.getRootConverter(), Collections.<Integer>emptyList(), valueInspectorsByColumn, columnIOsByIndexFieldPath);
+  }
+
+  public static List<Integer> getIndexFieldPathList(PrimitiveColumnIO c) {
+    return intArrayToList(c.getIndexFieldPath());
+  }
+
+  public static List<Integer> intArrayToList(int[] arr) {
+    List<Integer> list = new ArrayList<Integer>(arr.length);
+    for (int i : arr) {
+      list.add(i);
+    }
+    return list;
+  }
+
+
+
+  @Override
+  public T getCurrentRecord() {
+
+    // find out if the predicate thinks we should keep this record
+    boolean keep = IncrementallyUpdatedFilterPredicateEvaluator.evaluate(filterPredicate);
+
+    // reset the stateful predicate no matter what
+    IncrementallyUpdatedFilterPredicateResetter.reset(filterPredicate);
+
+    if (keep) {
+      return delegate.getCurrentRecord();
+    } else {
+      // signals a skip
+      return null;
+    }
+  }
+
+  @Override
+  public void skipCurrentRecord() {
+    delegate.skipCurrentRecord();
+  }
+
+  @Override
+  public GroupConverter getRootConverter() {
+    return rootConverter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
new file mode 100644
index 0000000..457f0c9
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java
@@ -0,0 +1,139 @@
+package parquet.filter2.recordlevel;
+
+import parquet.io.api.Binary;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * A rewritten version of a {@link parquet.filter2.predicate.FilterPredicate} which receives
+ * the values for a record's columns one by one and internally tracks whether the predicate is
+ * satisfied, unsatisfied, or unknown.
+ *
+ * This is used to apply a predicate during record assembly, without assembling a second copy of
+ * a record, and without building a stack of update events.
+ *
+ * IncrementallyUpdatedFilterPredicate is implemented via the visitor pattern, as is
+ * {@link parquet.filter2.predicate.FilterPredicate}
+ */
+public interface IncrementallyUpdatedFilterPredicate {
+
+  /**
+   * A Visitor for an {@link IncrementallyUpdatedFilterPredicate}, per the visitor pattern.
+   */
+  public static interface Visitor {
+    boolean visit(ValueInspector p);
+    boolean visit(And and);
+    boolean visit(Or or);
+  }
+
+  /**
+   * A {@link IncrementallyUpdatedFilterPredicate} must accept a {@link Visitor}, per the visitor pattern.
+   */
+  boolean accept(Visitor visitor);
+
+  /**
+   * This is the leaf node of a filter predicate. It receives the value for the primitive column it represents,
+   * and decides whether or not the predicate represented by this node is satisfied.
+   *
+   * It is stateful, and needs to be rest after use.
+   */
+  public static abstract class ValueInspector implements IncrementallyUpdatedFilterPredicate {
+    // package private constructor
+    ValueInspector() { }
+
+    private boolean result = false;
+    private boolean isKnown = false;
+
+    // these methods signal what the value is
+    public void updateNull() { throw new UnsupportedOperationException(); }
+    public void update(int value) { throw new UnsupportedOperationException(); }
+    public void update(long value) { throw new UnsupportedOperationException(); }
+    public void update(double value) { throw new UnsupportedOperationException(); }
+    public void update(float value) { throw new UnsupportedOperationException(); }
+    public void update(boolean value) { throw new UnsupportedOperationException(); }
+    public void update(Binary value) { throw new UnsupportedOperationException(); }
+
+    /**
+     * Reset to clear state and begin evaluating the next record.
+     */
+    public final void reset() {
+      isKnown = false;
+      result = false;
+    }
+
+    /**
+     * Subclasses should call this method to signal that the result of this predicate is known.
+     */
+    protected final void setResult(boolean result) {
+      if (isKnown) {
+        throw new IllegalStateException("setResult() called on a ValueInspector whose result is already known!"
+          + " Did you forget to call reset()?");
+      }
+      this.result = result;
+      this.isKnown = true;
+    }
+
+    /**
+     * Should only be called if {@link #isKnown} return true.
+     */
+    public final boolean getResult() {
+      if (!isKnown) {
+        throw new IllegalStateException("getResult() called on a ValueInspector whose result is not yet known!");
+      }
+      return result;
+    }
+
+    /**
+     * Return true if this inspector has received a value yet, false otherwise.
+     */
+    public final boolean isKnown() {
+      return isKnown;
+    }
+
+    @Override
+    public boolean accept(Visitor visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  // base class for and / or
+  static abstract class BinaryLogical implements IncrementallyUpdatedFilterPredicate {
+    private final IncrementallyUpdatedFilterPredicate left;
+    private final IncrementallyUpdatedFilterPredicate right;
+
+    BinaryLogical(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+      this.left = checkNotNull(left, "left");
+      this.right = checkNotNull(right, "right");
+    }
+
+    public final IncrementallyUpdatedFilterPredicate getLeft() {
+      return left;
+    }
+
+    public final IncrementallyUpdatedFilterPredicate getRight() {
+      return right;
+    }
+  }
+
+  public static final class Or extends BinaryLogical {
+    Or(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+      super(left, right);
+    }
+
+    @Override
+    public boolean accept(Visitor visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static final class And extends BinaryLogical {
+    And(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) {
+      super(left, right);
+    }
+
+    @Override
+    public boolean accept(Visitor visitor) {
+      return visitor.visit(this);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
new file mode 100644
index 0000000..9481738
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java
@@ -0,0 +1,79 @@
+package parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.FilterPredicate.Visitor;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static parquet.Preconditions.checkArgument;
+
+/**
+ * The implementation of this abstract class is auto-generated by
+ * {@link parquet.filter2.IncrementallyUpdatedFilterPredicateGenerator}
+ *
+ * Constructs a {@link IncrementallyUpdatedFilterPredicate} from a {@link parquet.filter2.predicate.FilterPredicate}
+ * This is how records are filtered during record assembly. The implementation is generated in order to avoid autoboxing.
+ *
+ * Note: the supplied predicate must not contain any instances of the not() operator as this is not
+ * supported by this filter.
+ *
+ * the supplied predicate should first be run through {@link parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
+ * in a form that doesn't make use of the not() operator.
+ *
+ * the supplied predicate should also have already been run through
+ * {@link parquet.filter2.predicate.SchemaCompatibilityValidator}
+ * to make sure it is compatible with the schema of this file.
+ *
+ * TODO: UserDefinedPredicates still autobox however
+ */
+public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor<IncrementallyUpdatedFilterPredicate> {
+  private boolean built = false;
+  private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn = new HashMap<ColumnPath, List<ValueInspector>>();
+
+  public IncrementallyUpdatedFilterPredicateBuilderBase() { }
+
+  public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) {
+    checkArgument(!built, "This builder has already been used");
+    IncrementallyUpdatedFilterPredicate incremental = pred.accept(this);
+    built = true;
+    return incremental;
+  }
+
+  protected final void addValueInspector(ColumnPath columnPath, ValueInspector valueInspector) {
+    List<ValueInspector> valueInspectors = valueInspectorsByColumn.get(columnPath);
+    if (valueInspectors == null) {
+      valueInspectors = new ArrayList<ValueInspector>();
+      valueInspectorsByColumn.put(columnPath, valueInspectors);
+    }
+    valueInspectors.add(valueInspector);
+  }
+
+  public Map<ColumnPath, List<ValueInspector>> getValueInspectorsByColumn() {
+    return valueInspectorsByColumn;
+  }
+
+  @Override
+  public final IncrementallyUpdatedFilterPredicate visit(And and) {
+    return new IncrementallyUpdatedFilterPredicate.And(and.getLeft().accept(this), and.getRight().accept(this));
+  }
+
+  @Override
+  public final IncrementallyUpdatedFilterPredicate visit(Or or) {
+    return new IncrementallyUpdatedFilterPredicate.Or(or.getLeft().accept(this), or.getRight().accept(this));
+  }
+
+  @Override
+  public final IncrementallyUpdatedFilterPredicate visit(Not not) {
+    throw new IllegalArgumentException(
+        "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
new file mode 100644
index 0000000..7536d8e
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java
@@ -0,0 +1,45 @@
+package parquet.filter2.recordlevel;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Determines whether an {@link IncrementallyUpdatedFilterPredicate} is satisfied or not.
+ * This implementation makes the assumption that all {@link ValueInspector}s in an unknown state
+ * represent columns with a null value, and updates them accordingly.
+ *
+ * TODO: We could also build an evaluator that detects if enough values are known to determine the outcome
+ * TODO: of the predicate and quit the record assembly early. (https://issues.apache.org/jira/browse/PARQUET-37)
+ */
+public class IncrementallyUpdatedFilterPredicateEvaluator implements Visitor {
+  private static final IncrementallyUpdatedFilterPredicateEvaluator INSTANCE = new IncrementallyUpdatedFilterPredicateEvaluator();
+
+  public static boolean evaluate(IncrementallyUpdatedFilterPredicate pred) {
+    checkNotNull(pred, "pred");
+    return pred.accept(INSTANCE);
+  }
+
+  private IncrementallyUpdatedFilterPredicateEvaluator() {}
+
+  @Override
+  public boolean visit(ValueInspector p) {
+    if (!p.isKnown()) {
+      p.updateNull();
+    }
+    return p.getResult();
+  }
+
+  @Override
+  public boolean visit(And and) {
+    return and.getLeft().accept(this) && and.getRight().accept(this);
+  }
+
+  @Override
+  public boolean visit(Or or) {
+    return or.getLeft().accept(this) || or.getRight().accept(this);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
new file mode 100644
index 0000000..c75ef45
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java
@@ -0,0 +1,42 @@
+package parquet.filter2.recordlevel;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Resets all the {@link ValueInspector}s in a {@link IncrementallyUpdatedFilterPredicate}.
+ */
+public final class IncrementallyUpdatedFilterPredicateResetter implements Visitor {
+  private static final IncrementallyUpdatedFilterPredicateResetter INSTANCE = new IncrementallyUpdatedFilterPredicateResetter();
+
+  public static void reset(IncrementallyUpdatedFilterPredicate pred) {
+    checkNotNull(pred, "pred");
+    pred.accept(INSTANCE);
+  }
+
+  private IncrementallyUpdatedFilterPredicateResetter() { }
+
+  @Override
+  public boolean visit(ValueInspector p) {
+    p.reset();
+    return false;
+  }
+
+  @Override
+  public boolean visit(And and) {
+    and.getLeft().accept(this);
+    and.getRight().accept(this);
+    return false;
+  }
+
+  @Override
+  public boolean visit(Or or) {
+    or.getLeft().accept(this);
+    or.getRight().accept(this);
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
index b6239cb..a1a51c2 100644
--- a/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
+++ b/parquet-column/src/main/java/parquet/io/FilteredRecordReader.java
@@ -62,6 +62,12 @@ class FilteredRecordReader<T> extends RecordReaderImplementation<T> {
     return super.read();
   }
 
+  // FilteredRecordReader skips forwards itself, it never asks the layer above to do the skipping for it.
+  // This is different from how filtering is handled in the filter2 API
+  @Override
+  public boolean shouldSkipCurrentRecord() {
+    return false;
+  }
 
   /**
    * Skips forwards until the filter finds the first match. Returns false

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
index c1ffbe6..bc048b0 100644
--- a/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
+++ b/parquet-column/src/main/java/parquet/io/MessageColumnIO.java
@@ -25,11 +25,23 @@ import parquet.column.ColumnWriter;
 import parquet.column.impl.ColumnReadStoreImpl;
 import parquet.column.page.PageReadStore;
 import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.FilterCompat.FilterPredicateCompat;
+import parquet.filter2.compat.FilterCompat.NoOpFilter;
+import parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat;
+import parquet.filter2.compat.FilterCompat.Visitor;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.recordlevel.FilteringRecordMaterializer;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicateBuilder;
 import parquet.io.api.Binary;
 import parquet.io.api.RecordConsumer;
 import parquet.io.api.RecordMaterializer;
 import parquet.schema.MessageType;
 
+import static parquet.Preconditions.checkNotNull;
+
 /**
  * Message level of the IO structure
  *
@@ -55,32 +67,74 @@ public class MessageColumnIO extends GroupColumnIO {
     return super.getColumnNames();
   }
 
-  public <T> RecordReader<T> getRecordReader(PageReadStore columns, RecordMaterializer<T> recordMaterializer) {
-    if (leaves.size() > 0) {
-      return new RecordReaderImplementation<T>(
-        this,
-        recordMaterializer,
-        validating,
-        new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType())
-      );
-    } else {
+  public <T> RecordReader<T> getRecordReader(PageReadStore columns,
+                                             RecordMaterializer<T> recordMaterializer) {
+    return getRecordReader(columns, recordMaterializer, FilterCompat.NOOP);
+  }
+
+  /**
+   * @deprecated use {@link #getRecordReader(PageReadStore, RecordMaterializer, Filter)}
+   */
+  @Deprecated
+  public <T> RecordReader<T> getRecordReader(PageReadStore columns,
+                                             RecordMaterializer<T> recordMaterializer,
+                                             UnboundRecordFilter filter) {
+    return getRecordReader(columns, recordMaterializer, FilterCompat.get(filter));
+  }
+
+  public <T> RecordReader<T> getRecordReader(final PageReadStore columns,
+                                             final RecordMaterializer<T> recordMaterializer,
+                                             final Filter filter) {
+    checkNotNull(columns, "columns");
+    checkNotNull(recordMaterializer, "recordMaterializer");
+    checkNotNull(filter, "filter");
+
+    if (leaves.isEmpty()) {
       return new EmptyRecordReader<T>(recordMaterializer);
     }
-  }
 
-  public <T> RecordReader<T> getRecordReader(PageReadStore columns, RecordMaterializer<T> recordMaterializer,
-                                             UnboundRecordFilter unboundFilter) {
-
-    return (unboundFilter == null)
-      ? getRecordReader(columns, recordMaterializer)
-      : new FilteredRecordReader<T>(
-        this,
-        recordMaterializer,
-        validating,
-        new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()),
-        unboundFilter,
-        columns.getRowCount()
-    );
+    return filter.accept(new Visitor<RecordReader<T>>() {
+      @Override
+      public RecordReader<T> visit(FilterPredicateCompat filterPredicateCompat) {
+
+        FilterPredicate predicate = filterPredicateCompat.getFilterPredicate();
+        IncrementallyUpdatedFilterPredicateBuilder builder = new IncrementallyUpdatedFilterPredicateBuilder();
+        IncrementallyUpdatedFilterPredicate streamingPredicate = builder.build(predicate);
+        RecordMaterializer<T> filteringRecordMaterializer = new FilteringRecordMaterializer<T>(
+            recordMaterializer,
+            leaves,
+            builder.getValueInspectorsByColumn(),
+            streamingPredicate);
+
+        return new RecordReaderImplementation<T>(
+            MessageColumnIO.this,
+            filteringRecordMaterializer,
+            validating,
+            new ColumnReadStoreImpl(columns, filteringRecordMaterializer.getRootConverter(), getType()));
+      }
+
+      @Override
+      public RecordReader<T> visit(UnboundRecordFilterCompat unboundRecordFilterCompat) {
+        return new FilteredRecordReader<T>(
+            MessageColumnIO.this,
+            recordMaterializer,
+            validating,
+            new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()),
+            unboundRecordFilterCompat.getUnboundRecordFilter(),
+            columns.getRowCount()
+        );
+
+      }
+
+      @Override
+      public RecordReader<T> visit(NoOpFilter noOpFilter) {
+        return new RecordReaderImplementation<T>(
+            MessageColumnIO.this,
+            recordMaterializer,
+            validating,
+            new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()));
+      }
+    });
   }
 
   private class MessageColumnIORecordConsumer extends RecordConsumer {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/RecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/RecordReader.java b/parquet-column/src/main/java/parquet/io/RecordReader.java
index f01b02a..e0cfeb6 100644
--- a/parquet-column/src/main/java/parquet/io/RecordReader.java
+++ b/parquet-column/src/main/java/parquet/io/RecordReader.java
@@ -25,9 +25,16 @@ package parquet.io;
 public abstract class RecordReader<T> {
 
   /**
-   * reads one record and returns it
+   * Reads one record and returns it.
    * @return the materialized record
    */
   public abstract T read();
 
+  /**
+   * Returns whether the current record should be skipped (dropped)
+   * Will be called *after* read()
+   */
+  public boolean shouldSkipCurrentRecord() {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
index c5d7da7..247900e 100644
--- a/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
+++ b/parquet-column/src/main/java/parquet/io/RecordReaderImplementation.java
@@ -234,6 +234,8 @@ class RecordReaderImplementation<T> extends RecordReader<T> {
   private State[] states;
   private ColumnReader[] columnReaders;
 
+  private boolean shouldSkipCurrentRecord = false;
+
   /**
    * @param root the root of the schema
    * @param recordMaterializer responsible of materializing the records
@@ -411,7 +413,17 @@ class RecordReaderImplementation<T> extends RecordReader<T> {
       currentState = currentState.nextState[nextR];
     } while (currentState != null);
     recordRootConverter.end();
-    return recordMaterializer.getCurrentRecord();
+    T record = recordMaterializer.getCurrentRecord();
+    shouldSkipCurrentRecord = record == null;
+    if (shouldSkipCurrentRecord) {
+      recordMaterializer.skipCurrentRecord();
+    }
+    return record;
+  }
+
+  @Override
+  public boolean shouldSkipCurrentRecord() {
+    return shouldSkipCurrentRecord;
   }
 
   private static void log(String string) {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/api/Binary.java b/parquet-column/src/main/java/parquet/io/api/Binary.java
index 1ef23fb..432f075 100644
--- a/parquet-column/src/main/java/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/parquet/io/api/Binary.java
@@ -15,11 +15,11 @@
  */
 package parquet.io.api;
 
-import static parquet.bytes.BytesUtils.UTF8;
-
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.ObjectStreamException;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -27,230 +27,325 @@ import java.util.Arrays;
 import parquet.bytes.BytesUtils;
 import parquet.io.ParquetEncodingException;
 
-abstract public class Binary {
+import static parquet.bytes.BytesUtils.UTF8;
+
+abstract public class Binary implements Comparable<Binary>, Serializable {
+
+  // this isn't really something others should extend
+  private Binary() { }
 
   public static final Binary EMPTY = fromByteArray(new byte[0]);
 
-  public static Binary fromByteArray(
-      final byte[] value,
-      final int offset,
-      final int length) {
-
-    return new Binary() {
-      @Override
-      public String toStringUsingUTF8() {
-        return UTF8.decode(ByteBuffer.wrap(value, offset, length)).toString();
-        // TODO: figure out why the following line was much slower
-        // rdb: new String(...) is slower because it instantiates a new Decoder,
-        //      while Charset#decode uses a thread-local decoder cache
-        // return new String(value, offset, length, BytesUtils.UTF8);
-      }
+  abstract public String toStringUsingUTF8();
 
-      @Override
-      public int length() {
-        return length;
-      }
+  abstract public int length();
 
-      @Override
-      public void writeTo(OutputStream out) throws IOException {
-        out.write(value, offset, length);
-      }
+  abstract public void writeTo(OutputStream out) throws IOException;
 
-      @Override
-      public byte[] getBytes() {
-        return Arrays.copyOfRange(value, offset, offset + length);
-      }
+  abstract public void writeTo(DataOutput out) throws IOException;
 
-      @Override
-      public int hashCode() {
-        return Binary.hashCode(value, offset, length);
-      }
+  abstract public byte[] getBytes();
 
-      @Override
-      boolean equals(Binary other) {
-        return other.equals(value, offset, length);
-      }
+  abstract boolean equals(byte[] bytes, int offset, int length);
 
-      @Override
-      boolean equals(byte[] other, int otherOffset, int otherLength) {
-        return Binary.equals(value, offset, length, other, otherOffset, otherLength);
-      }
+  abstract boolean equals(Binary other);
 
-      @Override
-      public int compareTo(Binary other) {
-        return other.compareTo(value, offset, length);
-      }
+  abstract public int compareTo(Binary other);
 
-      @Override
-      int compareTo(byte[] other, int otherOffset, int otherLength) {
-        return Binary.compareTwoByteArrays(value, offset, length, other, otherOffset, otherLength);
-      }
+  abstract int compareTo(byte[] bytes, int offset, int length);
 
-      @Override
-      public ByteBuffer toByteBuffer() {
-        return ByteBuffer.wrap(value, offset, length);
-      }
+  abstract public ByteBuffer toByteBuffer();
 
-      @Override
-      public void writeTo(DataOutput out) throws IOException {
-        out.write(value, offset, length);
-      }
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (obj instanceof Binary) {
+      return equals((Binary)obj);
+    }
+    return false;
+  }
 
-    };
+  @Override
+  public String toString() {
+    return "Binary{" + length() + " bytes, " + Arrays.toString(getBytes()) + "}";
   }
 
-  public static Binary fromByteArray(final byte[] value) {
-    return new Binary() {
-      @Override
-      public String toStringUsingUTF8() {
-        return new String(value, BytesUtils.UTF8);
-      }
+  private static class ByteArraySliceBackedBinary extends Binary {
+    private final byte[] value;
+    private final int offset;
+    private final int length;
 
-      @Override
-      public int length() {
-        return value.length;
-      }
+    public ByteArraySliceBackedBinary(byte[] value, int offset, int length) {
+      this.value = value;
+      this.offset = offset;
+      this.length = length;
+    }
 
-      @Override
-      public void writeTo(OutputStream out) throws IOException {
-        out.write(value);
-      }
+    @Override
+    public String toStringUsingUTF8() {
+      return UTF8.decode(ByteBuffer.wrap(value, offset, length)).toString();
+      // TODO: figure out why the following line was much slower
+      // rdb: new String(...) is slower because it instantiates a new Decoder,
+      //      while Charset#decode uses a thread-local decoder cache
+      // return new String(value, offset, length, BytesUtils.UTF8);
+    }
 
-      @Override
-      public byte[] getBytes() {
-        return value;
-      }
+    @Override
+    public int length() {
+      return length;
+    }
 
-      @Override
-      public int hashCode() {
-        return Binary.hashCode(value, 0, value.length);
-      }
+    @Override
+    public void writeTo(OutputStream out) throws IOException {
+      out.write(value, offset, length);
+    }
 
-      @Override
-      boolean equals(Binary other) {
-        return other.equals(value, 0, value.length);
-      }
+    @Override
+    public byte[] getBytes() {
+      return Arrays.copyOfRange(value, offset, offset + length);
+    }
 
-      @Override
-      boolean equals(byte[] other, int otherOffset, int otherLength) {
-        return Binary.equals(value, 0, value.length, other, otherOffset, otherLength);
-      }
+    @Override
+    public int hashCode() {
+      return Binary.hashCode(value, offset, length);
+    }
 
-      @Override
-      public int compareTo(Binary other) {
-        return other.compareTo(value, 0, value.length);
-      }
+    @Override
+    boolean equals(Binary other) {
+      return other.equals(value, offset, length);
+    }
 
-      @Override
-      int compareTo(byte[] other, int otherOffset, int otherLength) {
-        return Binary.compareTwoByteArrays(value, 0, value.length, other, otherOffset, otherLength);
-      }
+    @Override
+    boolean equals(byte[] other, int otherOffset, int otherLength) {
+      return Binary.equals(value, offset, length, other, otherOffset, otherLength);
+    }
 
-      @Override
-      public ByteBuffer toByteBuffer() {
-        return ByteBuffer.wrap(value);
-      }
+    @Override
+    public int compareTo(Binary other) {
+      return other.compareTo(value, offset, length);
+    }
+
+    @Override
+    int compareTo(byte[] other, int otherOffset, int otherLength) {
+      return Binary.compareTwoByteArrays(value, offset, length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer() {
+      return ByteBuffer.wrap(value, offset, length);
+    }
+
+    @Override
+    public void writeTo(DataOutput out) throws IOException {
+      out.write(value, offset, length);
+    }
 
-      @Override
-      public void writeTo(DataOutput out) throws IOException {
-        out.write(value);
-      }
-    };
   }
 
-  public static Binary fromByteBuffer(final ByteBuffer value) {
-    return new Binary() {
-      @Override
-      public String toStringUsingUTF8() {
-        return new String(getBytes(), BytesUtils.UTF8);
-      }
+  private static class FromStringBinary extends ByteArrayBackedBinary {
+    public FromStringBinary(byte[] value) {
+      super(value);
+    }
 
-      @Override
-      public int length() {
-        return value.remaining();
-      }
+    @Override
+    public String toString() {
+      return "Binary{\"" + toStringUsingUTF8() + "\"}";
+    }
+  }
 
-      @Override
-      public void writeTo(OutputStream out) throws IOException {
-        // TODO: should not have to materialize those bytes
-        out.write(getBytes());
-      }
+  public static Binary fromByteArray(final byte[] value, final int offset, final int length) {
+    return new ByteArraySliceBackedBinary(value, offset, length);
+  }
 
-      @Override
-      public byte[] getBytes() {
-        byte[] bytes = new byte[value.remaining()];
+  private static class ByteArrayBackedBinary extends Binary {
+    private final byte[] value;
 
-        value.mark();
-        value.get(bytes).reset();
-        return bytes;
-      }
+    public ByteArrayBackedBinary(byte[] value) {
+      this.value = value;
+    }
 
-      @Override
-      public int hashCode() {
-        if (value.hasArray()) {
-          return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
-              value.arrayOffset() + value.remaining());
-        }
-        byte[] bytes = getBytes();
-        return Binary.hashCode(bytes, 0, bytes.length);
-      }
+    @Override
+    public String toStringUsingUTF8() {
+      return new String(value, BytesUtils.UTF8);
+    }
 
-      @Override
-      boolean equals(Binary other) {
-        if (value.hasArray()) {
-          return other.equals(value.array(), value.arrayOffset() + value.position(),
-              value.arrayOffset() + value.remaining());
-        }
-        byte[] bytes = getBytes();
-        return other.equals(bytes, 0, bytes.length);
-      }
+    @Override
+    public int length() {
+      return value.length;
+    }
+
+    @Override
+    public void writeTo(OutputStream out) throws IOException {
+      out.write(value);
+    }
+
+    @Override
+    public byte[] getBytes() {
+      return value;
+    }
 
-      @Override
-      boolean equals(byte[] other, int otherOffset, int otherLength) {
-        if (value.hasArray()) {
-          return Binary.equals(value.array(), value.arrayOffset() + value.position(),
-              value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
-        }
-        byte[] bytes = getBytes();
-        return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
+    @Override
+    public int hashCode() {
+      return Binary.hashCode(value, 0, value.length);
+    }
+
+    @Override
+    boolean equals(Binary other) {
+      return other.equals(value, 0, value.length);
+    }
+
+    @Override
+    boolean equals(byte[] other, int otherOffset, int otherLength) {
+      return Binary.equals(value, 0, value.length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public int compareTo(Binary other) {
+      return other.compareTo(value, 0, value.length);
+    }
+
+    @Override
+    int compareTo(byte[] other, int otherOffset, int otherLength) {
+      return Binary.compareTwoByteArrays(value, 0, value.length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer() {
+      return ByteBuffer.wrap(value);
+    }
+
+    @Override
+    public void writeTo(DataOutput out) throws IOException {
+      out.write(value);
+    }
+
+  }
+
+  public static Binary fromByteArray(final byte[] value) {
+    return new ByteArrayBackedBinary(value);
+  }
+
+  private static class ByteBufferBackedBinary extends Binary {
+    private transient ByteBuffer value;
+
+    public ByteBufferBackedBinary(ByteBuffer value) {
+      this.value = value;
+    }
+
+    @Override
+    public String toStringUsingUTF8() {
+      return new String(getBytes(), BytesUtils.UTF8);
+    }
+
+    @Override
+    public int length() {
+      return value.remaining();
+    }
+
+    @Override
+    public void writeTo(OutputStream out) throws IOException {
+      // TODO: should not have to materialize those bytes
+      out.write(getBytes());
+    }
+
+    @Override
+    public byte[] getBytes() {
+      byte[] bytes = new byte[value.remaining()];
+
+      value.mark();
+      value.get(bytes).reset();
+      return bytes;
+    }
+
+    @Override
+    public int hashCode() {
+      if (value.hasArray()) {
+        return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining());
       }
+      byte[] bytes = getBytes();
+      return Binary.hashCode(bytes, 0, bytes.length);
+    }
 
-      @Override
-      public int compareTo(Binary other) {
-        if (value.hasArray()) {
-          return other.compareTo(value.array(), value.arrayOffset() + value.position(),
-              value.arrayOffset() + value.remaining());
-        }
-        byte[] bytes = getBytes();
-        return other.compareTo(bytes, 0, bytes.length);
+    @Override
+    boolean equals(Binary other) {
+      if (value.hasArray()) {
+        return other.equals(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining());
       }
+      byte[] bytes = getBytes();
+      return other.equals(bytes, 0, bytes.length);
+    }
 
-      @Override
-      int compareTo(byte[] other, int otherOffset, int otherLength) {
-        if (value.hasArray()) {
-          return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
-              value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
-        }
-        byte[] bytes = getBytes();
-        return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
+    @Override
+    boolean equals(byte[] other, int otherOffset, int otherLength) {
+      if (value.hasArray()) {
+        return Binary.equals(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
       }
+      byte[] bytes = getBytes();
+      return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
+    }
 
-      @Override
-      public ByteBuffer toByteBuffer() {
-        return value;
+    @Override
+    public int compareTo(Binary other) {
+      if (value.hasArray()) {
+        return other.compareTo(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining());
       }
+      byte[] bytes = getBytes();
+      return other.compareTo(bytes, 0, bytes.length);
+    }
 
-      @Override
-      public void writeTo(DataOutput out) throws IOException {
-        // TODO: should not have to materialize those bytes
-        out.write(getBytes());
+    @Override
+    int compareTo(byte[] other, int otherOffset, int otherLength) {
+      if (value.hasArray()) {
+        return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
+            value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
       }
-    };
+      byte[] bytes = getBytes();
+      return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    public ByteBuffer toByteBuffer() {
+      return value;
+    }
+
+    @Override
+    public void writeTo(DataOutput out) throws IOException {
+      // TODO: should not have to materialize those bytes
+      out.write(getBytes());
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
+      byte[] bytes = getBytes();
+      out.writeInt(bytes.length);
+      out.write(bytes);
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+      int length = in.readInt();
+      byte[] bytes = new byte[length];
+      in.readFully(bytes, 0, length);
+      this.value = ByteBuffer.wrap(bytes);
+    }
+
+    private void readObjectNoData() throws ObjectStreamException {
+      this.value = ByteBuffer.wrap(new byte[0]);
+    }
+
+  }
+
+  public static Binary fromByteBuffer(final ByteBuffer value) {
+    return new ByteBufferBackedBinary(value);
   }
 
   public static Binary fromString(final String value) {
     try {
-      return fromByteArray(value.getBytes("UTF-8"));
+      return new FromStringBinary(value.getBytes("UTF-8"));
     } catch (UnsupportedEncodingException e) {
       throw new ParquetEncodingException("UTF-8 not supported.", e);
     }
@@ -313,39 +408,4 @@ abstract public class Binary {
     else if (length1 < length2) { return 1;}
     else { return -1; }
   }
-
-  abstract public String toStringUsingUTF8();
-
-  abstract public int length();
-
-  abstract public void writeTo(OutputStream out) throws IOException;
-
-  abstract public void writeTo(DataOutput out) throws IOException;
-
-  abstract public byte[] getBytes();
-
-  abstract boolean equals(byte[] bytes, int offset, int length);
-
-  abstract boolean equals(Binary other);
-
-  abstract public int compareTo(Binary other);
-
-  abstract int compareTo(byte[] bytes, int offset, int length);
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null) {
-      return false;
-    }
-    if (obj instanceof Binary) {
-      return equals((Binary)obj);
-    }
-    return false;
-  }
-
-  abstract public ByteBuffer toByteBuffer();
-
-  public String toString() {
-    return "Binary{" + length() + " bytes, " + Arrays.toString(getBytes()) + "}";
-  };
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java b/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java
index 7d90c6a..7ff0f1c 100644
--- a/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java
+++ b/parquet-column/src/main/java/parquet/io/api/RecordMaterializer.java
@@ -34,6 +34,11 @@ abstract public class RecordMaterializer<T> {
   abstract public T getCurrentRecord();
 
   /**
+   * Called if {@link #getCurrentRecord()} isn't going to be called.
+   */
+  public void skipCurrentRecord() { }
+
+  /**
    * @return the root converter for this tree
    */
   abstract public GroupConverter getRootConverter();

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java b/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java
new file mode 100644
index 0000000..277fa43
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java
@@ -0,0 +1,19 @@
+package parquet.filter2.predicate;
+
+public class DummyUdp extends UserDefinedPredicate<Integer> {
+
+  @Override
+  public boolean keep(Integer value) {
+    return false;
+  }
+
+  @Override
+  public boolean canDrop(Statistics<Integer> statistics) {
+    return false;
+  }
+
+  @Override
+  public boolean inverseCanDrop(Statistics<Integer> statistics) {
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java
new file mode 100644
index 0000000..dafd7fd
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestFilterApiMethods.java
@@ -0,0 +1,103 @@
+package parquet.filter2.predicate;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.junit.Test;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.BinaryColumn;
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.Eq;
+import parquet.filter2.predicate.Operators.Gt;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.predicate.Operators.UserDefined;
+import parquet.io.api.Binary;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.binaryColumn;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+import static parquet.filter2.predicate.Operators.NotEq;
+
+public class TestFilterApiMethods {
+
+  private static final IntColumn intColumn = intColumn("a.b.c");
+  private static final DoubleColumn doubleColumn = doubleColumn("x.y.z");
+  private static final BinaryColumn binColumn = binaryColumn("a.string.column");
+
+  private static final FilterPredicate predicate =
+      and(not(or(eq(intColumn, 7), notEq(intColumn, 17))), gt(doubleColumn, 100.0));
+
+  @Test
+  public void testFilterPredicateCreation() {
+    FilterPredicate outerAnd = predicate;
+
+    assertTrue(outerAnd instanceof And);
+
+    FilterPredicate not = ((And) outerAnd).getLeft();
+    FilterPredicate gt = ((And) outerAnd).getRight();
+    assertTrue(not instanceof Not);
+
+    FilterPredicate or = ((Not) not).getPredicate();
+    assertTrue(or instanceof Or);
+
+    FilterPredicate leftEq = ((Or) or).getLeft();
+    FilterPredicate rightNotEq = ((Or) or).getRight();
+    assertTrue(leftEq instanceof Eq);
+    assertTrue(rightNotEq instanceof NotEq);
+    assertEquals(7, ((Eq) leftEq).getValue());
+    assertEquals(17, ((NotEq) rightNotEq).getValue());
+    assertEquals(ColumnPath.get("a", "b", "c"), ((Eq) leftEq).getColumn().getColumnPath());
+    assertEquals(ColumnPath.get("a", "b", "c"), ((NotEq) rightNotEq).getColumn().getColumnPath());
+
+    assertTrue(gt instanceof Gt);
+    assertEquals(100.0, ((Gt) gt).getValue());
+    assertEquals(ColumnPath.get("x", "y", "z"), ((Gt) gt).getColumn().getColumnPath());
+  }
+
+  @Test
+  public void testToString() {
+    FilterPredicate pred = or(predicate, notEq(binColumn, Binary.fromString("foobarbaz")));
+    assertEquals("or(and(not(or(eq(a.b.c, 7), noteq(a.b.c, 17))), gt(x.y.z, 100.0)), "
+        + "noteq(a.string.column, Binary{\"foobarbaz\"}))",
+        pred.toString());
+  }
+
+  @Test
+  public void testUdp() {
+    FilterPredicate predicate = or(eq(doubleColumn, 12.0), userDefined(intColumn, DummyUdp.class));
+    assertTrue(predicate instanceof Or);
+    FilterPredicate ud = ((Or) predicate).getRight();
+    assertTrue(ud instanceof UserDefined);
+    assertEquals(DummyUdp.class, ((UserDefined) ud).getUserDefinedPredicateClass());
+    assertTrue(((UserDefined) ud).getUserDefinedPredicate() instanceof DummyUdp);
+  }
+
+  @Test
+  public void testSerializable() throws Exception {
+    BinaryColumn binary = binaryColumn("foo");
+    FilterPredicate p = or(and(userDefined(intColumn, DummyUdp.class), predicate), eq(binary, Binary.fromString("hi")));
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ObjectOutputStream oos = new ObjectOutputStream(baos);
+    oos.writeObject(p);
+    oos.close();
+
+    ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray()));
+    FilterPredicate read = (FilterPredicate) is.readObject();
+    assertEquals(p, read);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java
new file mode 100644
index 0000000..0aa360b
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverseRewriter.java
@@ -0,0 +1,85 @@
+package parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.UserDefined;
+
+import static org.junit.Assert.assertEquals;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.gtEq;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.lt;
+import static parquet.filter2.predicate.FilterApi.ltEq;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+import static parquet.filter2.predicate.LogicalInverseRewriter.rewrite;
+
+public class TestLogicalInverseRewriter {
+  private static final IntColumn intColumn = intColumn("a.b.c");
+  private static final DoubleColumn doubleColumn = doubleColumn("a.b.c");
+
+  private static final FilterPredicate complex =
+      and(
+          not(
+              or(ltEq(doubleColumn, 12.0),
+                  and(
+                      not(or(eq(intColumn, 7), notEq(intColumn, 17))),
+                      userDefined(intColumn, DummyUdp.class)))),
+          or(gt(doubleColumn, 100.0), not(gtEq(intColumn, 77))));
+
+  private static final FilterPredicate complexCollapsed =
+      and(
+          and(gt(doubleColumn, 12.0),
+              or(
+                  or(eq(intColumn, 7), notEq(intColumn, 17)),
+                  new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class)))),
+          or(gt(doubleColumn, 100.0), lt(intColumn, 77)));
+
+  private static void assertNoOp(FilterPredicate p) {
+    assertEquals(p, rewrite(p));
+  }
+
+  @Test
+  public void testBaseCases() {
+    UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+
+    assertNoOp(eq(intColumn, 17));
+    assertNoOp(notEq(intColumn, 17));
+    assertNoOp(lt(intColumn, 17));
+    assertNoOp(ltEq(intColumn, 17));
+    assertNoOp(gt(intColumn, 17));
+    assertNoOp(gtEq(intColumn, 17));
+    assertNoOp(and(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+    assertNoOp(or(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+    assertNoOp(ud);
+
+    assertEquals(notEq(intColumn, 17), rewrite(not(eq(intColumn, 17))));
+    assertEquals(eq(intColumn, 17), rewrite(not(notEq(intColumn, 17))));
+    assertEquals(gtEq(intColumn, 17), rewrite(not(lt(intColumn, 17))));
+    assertEquals(gt(intColumn, 17), rewrite(not(ltEq(intColumn, 17))));
+    assertEquals(ltEq(intColumn, 17), rewrite(not(gt(intColumn, 17))));
+    assertEquals(lt(intColumn, 17), rewrite(not(gtEq(intColumn, 17))));
+    assertEquals(new LogicalNotUserDefined<Integer, DummyUdp>(ud), rewrite(not(ud)));
+
+    FilterPredicate notedAnd = not(and(eq(intColumn, 17), eq(doubleColumn, 12.0)));
+    FilterPredicate distributedAnd = or(notEq(intColumn, 17), notEq(doubleColumn, 12.0));
+    assertEquals(distributedAnd, rewrite(notedAnd));
+
+    FilterPredicate andWithNots = and(not(gtEq(intColumn, 17)), lt(intColumn, 7));
+    FilterPredicate andWithoutNots = and(lt(intColumn, 17), lt(intColumn, 7));
+    assertEquals(andWithoutNots, rewrite(andWithNots));
+  }
+
+  @Test
+  public void testComplex() {
+    assertEquals(complexCollapsed, rewrite(complex));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java
new file mode 100644
index 0000000..19e6b68
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestLogicalInverter.java
@@ -0,0 +1,76 @@
+package parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.UserDefined;
+
+import static org.junit.Assert.assertEquals;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.gtEq;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.lt;
+import static parquet.filter2.predicate.FilterApi.ltEq;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+import static parquet.filter2.predicate.LogicalInverter.invert;
+
+public class TestLogicalInverter {
+  private static final IntColumn intColumn = intColumn("a.b.c");
+  private static final DoubleColumn doubleColumn = doubleColumn("a.b.c");
+
+  private  static  final UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+
+  private static final FilterPredicate complex =
+      and(
+          or(ltEq(doubleColumn, 12.0),
+              and(
+                  not(or(eq(intColumn, 7), notEq(intColumn, 17))),
+                  userDefined(intColumn, DummyUdp.class))),
+          or(gt(doubleColumn, 100.0), notEq(intColumn, 77)));
+
+  private static final FilterPredicate complexInverse =
+      or(
+          and(gt(doubleColumn, 12.0),
+              or(
+                  or(eq(intColumn, 7), notEq(intColumn, 17)),
+                  new LogicalNotUserDefined<Integer, DummyUdp>(userDefined(intColumn, DummyUdp.class)))),
+          and(ltEq(doubleColumn, 100.0), eq(intColumn, 77)));
+
+  @Test
+  public void testBaseCases() {
+    assertEquals(notEq(intColumn, 17), invert(eq(intColumn, 17)));
+    assertEquals(eq(intColumn, 17), invert(notEq(intColumn, 17)));
+    assertEquals(gtEq(intColumn, 17), invert(lt(intColumn, 17)));
+    assertEquals(gt(intColumn, 17), invert(ltEq(intColumn, 17)));
+    assertEquals(ltEq(intColumn, 17), invert(gt(intColumn, 17)));
+    assertEquals(lt(intColumn, 17), invert(gtEq(intColumn, 17)));
+
+    FilterPredicate andPos = and(eq(intColumn, 17), eq(doubleColumn, 12.0));
+    FilterPredicate andInv = or(notEq(intColumn, 17), notEq(doubleColumn, 12.0));
+    assertEquals(andInv, invert(andPos));
+
+    FilterPredicate orPos = or(eq(intColumn, 17), eq(doubleColumn, 12.0));
+    FilterPredicate orInv = and(notEq(intColumn, 17), notEq(doubleColumn, 12.0));
+    assertEquals(orPos, invert(orInv));
+
+    assertEquals(eq(intColumn, 17), invert(not(eq(intColumn, 17))));
+
+    UserDefined<Integer, DummyUdp> ud = userDefined(intColumn, DummyUdp.class);
+    assertEquals(new LogicalNotUserDefined<Integer, DummyUdp>(ud), invert(ud));
+    assertEquals(ud, invert(not(ud)));
+    assertEquals(ud, invert(new LogicalNotUserDefined<Integer, DummyUdp>(ud)));
+  }
+
+  @Test
+  public void testComplex() {
+    assertEquals(complexInverse, invert(complex));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
new file mode 100644
index 0000000..e9e745f
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestSchemaCompatibilityValidator.java
@@ -0,0 +1,124 @@
+package parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import parquet.filter2.predicate.Operators.BinaryColumn;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.LongColumn;
+import parquet.io.api.Binary;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.binaryColumn;
+import static parquet.filter2.predicate.FilterApi.eq;
+import static parquet.filter2.predicate.FilterApi.gt;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.longColumn;
+import static parquet.filter2.predicate.FilterApi.ltEq;
+import static parquet.filter2.predicate.FilterApi.not;
+import static parquet.filter2.predicate.FilterApi.notEq;
+import static parquet.filter2.predicate.FilterApi.or;
+import static parquet.filter2.predicate.FilterApi.userDefined;
+import static parquet.filter2.predicate.SchemaCompatibilityValidator.validate;
+
+public class TestSchemaCompatibilityValidator {
+  private static final BinaryColumn stringC = binaryColumn("c");
+  private static final LongColumn longBar = longColumn("x.bar");
+  private static final IntColumn intBar = intColumn("x.bar");
+  private static final LongColumn lotsOfLongs = longColumn("lotsOfLongs");
+
+  private static final String schemaString =
+      "message Document {\n"
+          + "  required int32 a;\n"
+          + "  required binary b;\n"
+          + "  required binary c (UTF8);\n"
+          + "  required group x { required int32 bar; }\n"
+          + "  repeated int64 lotsOfLongs;\n"
+          + "}\n";
+
+  private static final MessageType schema = MessageTypeParser.parseMessageType(schemaString);
+
+  private static final FilterPredicate complexValid =
+      and(
+          or(ltEq(stringC, Binary.fromString("foo")),
+              and(
+                  not(or(eq(intBar, 17), notEq(intBar, 17))),
+                  userDefined(intBar, DummyUdp.class))),
+          or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));
+
+  static class LongDummyUdp extends UserDefinedPredicate<Long> {
+    @Override
+    public boolean keep(Long value) {
+      return false;
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Long> statistics) {
+      return false;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Long> statistics) {
+      return false;
+    }
+  }
+
+  private static final FilterPredicate complexWrongType =
+      and(
+          or(ltEq(stringC, Binary.fromString("foo")),
+              and(
+                  not(or(eq(longBar, 17L), notEq(longBar, 17L))),
+                  userDefined(longBar, LongDummyUdp.class))),
+          or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));
+
+  private static final FilterPredicate complexMixedType =
+      and(
+          or(ltEq(stringC, Binary.fromString("foo")),
+              and(
+                  not(or(eq(intBar, 17), notEq(longBar, 17L))),
+                  userDefined(longBar, LongDummyUdp.class))),
+          or(gt(stringC, Binary.fromString("bar")), notEq(stringC, Binary.fromString("baz"))));
+
+  @Test
+  public void testValidType() {
+    validate(complexValid, schema);
+  }
+
+  @Test
+  public void testFindsInvalidTypes() {
+    try {
+      validate(complexWrongType, schema);
+      fail("this should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("FilterPredicate column: x.bar's declared type (java.lang.Long) does not match the schema found in file metadata. "
+          + "Column x.bar is of type: FullTypeDescriptor(PrimitiveType: INT32, OriginalType: null)\n"
+          + "Valid types for this column are: [class java.lang.Integer]", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTwiceDeclaredColumn() {
+    validate(eq(stringC, Binary.fromString("larry")), schema);
+
+    try {
+      validate(complexMixedType, schema);
+      fail("this should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("Column: x.bar was provided with different types in the same predicate. Found both: (class java.lang.Integer, class java.lang.Long)", e.getMessage());
+    }
+
+  }
+
+  @Test
+  public void testRepeatedNotSupported() {
+    try {
+      validate(eq(lotsOfLongs, 10l), schema);
+      fail("this should throw");
+    } catch (IllegalArgumentException e) {
+      assertEquals("FilterPredicates do not currently support repeated columns. Column lotsOfLongs is repeated.", e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/predicate/TestValidTypeMap.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/predicate/TestValidTypeMap.java b/parquet-column/src/test/java/parquet/filter2/predicate/TestValidTypeMap.java
new file mode 100644
index 0000000..07f2597
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/predicate/TestValidTypeMap.java
@@ -0,0 +1,93 @@
+package parquet.filter2.predicate;
+
+import org.junit.Test;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.Operators.BinaryColumn;
+import parquet.filter2.predicate.Operators.BooleanColumn;
+import parquet.filter2.predicate.Operators.Column;
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.FloatColumn;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.LongColumn;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static parquet.filter2.predicate.FilterApi.binaryColumn;
+import static parquet.filter2.predicate.FilterApi.booleanColumn;
+import static parquet.filter2.predicate.FilterApi.doubleColumn;
+import static parquet.filter2.predicate.FilterApi.floatColumn;
+import static parquet.filter2.predicate.FilterApi.intColumn;
+import static parquet.filter2.predicate.FilterApi.longColumn;
+import static parquet.filter2.predicate.ValidTypeMap.assertTypeValid;
+
+public class TestValidTypeMap {
+  public static IntColumn intColumn = intColumn("int.column");
+  public static LongColumn longColumn = longColumn("long.column");
+  public static FloatColumn floatColumn = floatColumn("float.column");
+  public static DoubleColumn doubleColumn = doubleColumn("double.column");
+  public static BooleanColumn booleanColumn = booleanColumn("boolean.column");
+  public static BinaryColumn binaryColumn = binaryColumn("binary.column");
+
+  private static class InvalidColumnType implements Comparable<InvalidColumnType> {
+    @Override
+    public int compareTo(InvalidColumnType o) {
+      return 0;
+    }
+  }
+
+  public static Column<InvalidColumnType> invalidColumn =
+      new Column<InvalidColumnType>(ColumnPath.get("invalid.column"), InvalidColumnType.class) { };
+
+  @Test
+  public void testValidTypes() {
+    assertTypeValid(intColumn, PrimitiveTypeName.INT32, null);
+    assertTypeValid(longColumn, PrimitiveTypeName.INT64, null);
+    assertTypeValid(floatColumn, PrimitiveTypeName.FLOAT, null);
+    assertTypeValid(doubleColumn, PrimitiveTypeName.DOUBLE, null);
+    assertTypeValid(booleanColumn, PrimitiveTypeName.BOOLEAN, null);
+    assertTypeValid(binaryColumn, PrimitiveTypeName.BINARY, null);
+    assertTypeValid(binaryColumn, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null);
+    assertTypeValid(binaryColumn, PrimitiveTypeName.BINARY, OriginalType.UTF8);
+    assertTypeValid(binaryColumn, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8);
+  }
+
+  @Test
+  public void testMismatchedTypes() {
+    try {
+      assertTypeValid(intColumn, PrimitiveTypeName.DOUBLE, null);
+      fail("This should throw!");
+    } catch (IllegalArgumentException e) {
+      assertEquals("FilterPredicate column: int.column's declared type (java.lang.Integer) does not match the "
+          + "schema found in file metadata. Column int.column is of type: "
+          + "FullTypeDescriptor(PrimitiveType: DOUBLE, OriginalType: null)\n"
+          + "Valid types for this column are: [class java.lang.Double]", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testUnsupportedType() {
+    try {
+      assertTypeValid(invalidColumn, PrimitiveTypeName.INT32, null);
+      fail("This should throw!");
+    } catch (IllegalArgumentException e) {
+      assertEquals("Column invalid.column was declared as type: "
+          + "parquet.filter2.predicate.TestValidTypeMap$InvalidColumnType which is not supported "
+          + "in FilterPredicates. Supported types for this column are: [class java.lang.Integer]", e.getMessage());
+    }
+
+    try {
+      assertTypeValid(invalidColumn, PrimitiveTypeName.INT32, OriginalType.UTF8);
+      fail("This should throw!");
+    } catch (IllegalArgumentException e) {
+      assertEquals("Column invalid.column was declared as type: "
+          + "parquet.filter2.predicate.TestValidTypeMap$InvalidColumnType which is not supported "
+          + "in FilterPredicates. There are no supported types for columns of FullTypeDescriptor(PrimitiveType: INT32, OriginalType: UTF8)",
+          e.getMessage());
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java
new file mode 100644
index 0000000..08b7a04
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateEvaluator.java
@@ -0,0 +1,191 @@
+package parquet.filter2.recordlevel;
+
+import org.junit.Test;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicateEvaluator.evaluate;
+
+public class TestIncrementallyUpdatedFilterPredicateEvaluator {
+
+  public static class ShortCircuitException extends RuntimeException {
+    public ShortCircuitException() {
+      super("this was supposed to short circuit and never get here!");
+    }
+  }
+
+  public static ValueInspector intIsNull() {
+    return new ValueInspector() {
+      @Override
+      public void updateNull() {
+        setResult(true);
+      }
+
+      @Override
+      public void update(int value) {
+        setResult(false);
+      }
+    };
+  }
+
+  public static ValueInspector intIsEven() {
+    return new ValueInspector() {
+      @Override
+      public void updateNull() {
+        setResult(false);
+      }
+
+      @Override
+      public void update(int value) {
+        setResult(value % 2 == 0);
+      }
+    };
+  }
+
+  public static ValueInspector doubleMoreThan10() {
+    return new ValueInspector() {
+      @Override
+      public void updateNull() {
+        setResult(false);
+      }
+
+      @Override
+      public void update(double value) {
+        setResult(value > 10.0);
+      }
+    };
+  }
+
+  @Test
+  public void testValueInspector() {
+    // known, and set to false criteria, null considered false
+    ValueInspector v = intIsEven();
+    v.update(11);
+    assertFalse(evaluate(v));
+    v.reset();
+
+    // known and set to true criteria, null considered false
+    v.update(12);
+    assertTrue(evaluate(v));
+    v.reset();
+
+    // known and set to null, null considered false
+    v.updateNull();
+    assertFalse(evaluate(v));
+    v.reset();
+
+    // known, and set to false criteria, null considered true
+    ValueInspector intIsNull = intIsNull();
+    intIsNull.update(10);
+    assertFalse(evaluate(intIsNull));
+    intIsNull.reset();
+
+    // known, and set to false criteria, null considered true
+    intIsNull.updateNull();
+    assertTrue(evaluate(intIsNull));
+    intIsNull.reset();
+
+    // unknown, null considered false
+    v.reset();
+    assertFalse(evaluate(v));
+
+    // unknown, null considered true
+    intIsNull.reset();
+    assertTrue(evaluate(intIsNull));
+  }
+
+  private void doOrTest(ValueInspector v1, ValueInspector v2, int v1Value, int v2Value, boolean expected) {
+    v1.update(v1Value);
+    v2.update(v2Value);
+    IncrementallyUpdatedFilterPredicate or = new Or(v1, v2);
+    assertEquals(expected, evaluate(or));
+    v1.reset();
+    v2.reset();
+  }
+
+  private void doAndTest(ValueInspector v1, ValueInspector v2, int v1Value, int v2Value, boolean expected) {
+    v1.update(v1Value);
+    v2.update(v2Value);
+    IncrementallyUpdatedFilterPredicate and = new And(v1, v2);
+    assertEquals(expected, evaluate(and));
+    v1.reset();
+    v2.reset();
+  }
+
+
+  @Test
+  public void testOr() {
+    ValueInspector v1 = intIsEven();
+    ValueInspector v2 = intIsEven();
+
+    int F = 11;
+    int T = 12;
+
+    // F || F == F
+    doOrTest(v1, v2, F, F, false);
+    // F || T == T
+    doOrTest(v1, v2, F, T, true);
+    // T || F == T
+    doOrTest(v1, v2, T, F, true);
+    // T || T == T
+    doOrTest(v1, v2, T, T, true);
+
+  }
+
+  @Test
+  public void testAnd() {
+    ValueInspector v1 = intIsEven();
+    ValueInspector v2 = intIsEven();
+
+    int F = 11;
+    int T = 12;
+
+    // F && F == F
+    doAndTest(v1, v2, F, F, false);
+    // F && T == F
+    doAndTest(v1, v2, F, T, false);
+    // T && F == F
+    doAndTest(v1, v2, T, F, false);
+    // T && T == T
+    doAndTest(v1, v2, T, T, true);
+
+  }
+
+  @Test
+  public void testShortCircuit() {
+    ValueInspector neverCalled = new ValueInspector() {
+      @Override
+      public boolean accept(Visitor visitor) {
+        throw new ShortCircuitException();
+      }
+    };
+
+    try {
+      evaluate(neverCalled);
+      fail("this should throw");
+    } catch (ShortCircuitException e) {
+      //
+    }
+
+    // T || X should evaluate to true without inspecting X
+    ValueInspector v = intIsEven();
+    v.update(10);
+    IncrementallyUpdatedFilterPredicate or = new Or(v, neverCalled);
+    assertTrue(evaluate(or));
+    v.reset();
+
+    // F && X should evaluate to false without inspecting X
+    v.update(11);
+    IncrementallyUpdatedFilterPredicate and = new And(v, neverCalled);
+    assertFalse(evaluate(and));
+    v.reset();
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java
new file mode 100644
index 0000000..974d6e7
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestIncrementallyUpdatedFilterPredicateResetter.java
@@ -0,0 +1,51 @@
+package parquet.filter2.recordlevel;
+
+
+import org.junit.Test;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.doubleMoreThan10;
+import static parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.intIsEven;
+import static parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.intIsNull;
+
+public class TestIncrementallyUpdatedFilterPredicateResetter {
+  @Test
+  public void testReset() {
+
+    ValueInspector intIsNull = intIsNull();
+    ValueInspector intIsEven = intIsEven();
+    ValueInspector doubleMoreThan10 = doubleMoreThan10();
+
+    IncrementallyUpdatedFilterPredicate pred = new Or(intIsNull, new And(intIsEven, doubleMoreThan10));
+
+    intIsNull.updateNull();
+    intIsEven.update(11);
+    doubleMoreThan10.update(20.0D);
+
+    assertTrue(intIsNull.isKnown());
+    assertTrue(intIsEven.isKnown());
+    assertTrue(doubleMoreThan10.isKnown());
+
+    IncrementallyUpdatedFilterPredicateResetter.reset(pred);
+
+    assertFalse(intIsNull.isKnown());
+    assertFalse(intIsEven.isKnown());
+    assertFalse(doubleMoreThan10.isKnown());
+
+    intIsNull.updateNull();
+    assertTrue(intIsNull.isKnown());
+    assertFalse(intIsEven.isKnown());
+    assertFalse(doubleMoreThan10.isKnown());
+
+    IncrementallyUpdatedFilterPredicateResetter.reset(pred);
+    assertFalse(intIsNull.isKnown());
+    assertFalse(intIsEven.isKnown());
+    assertFalse(doubleMoreThan10.isKnown());
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/filter2/recordlevel/TestValueInspector.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/filter2/recordlevel/TestValueInspector.java b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestValueInspector.java
new file mode 100644
index 0000000..fc2e587
--- /dev/null
+++ b/parquet-column/src/test/java/parquet/filter2/recordlevel/TestValueInspector.java
@@ -0,0 +1,79 @@
+package parquet.filter2.recordlevel;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static parquet.filter2.recordlevel.TestIncrementallyUpdatedFilterPredicateEvaluator.intIsEven;
+
+public class TestValueInspector {
+
+  @Test
+  public void testLifeCycle() {
+    ValueInspector v = intIsEven();
+
+    // begins in unknown state
+    assertFalse(v.isKnown());
+    // calling getResult in unknown state throws
+    try {
+      v.getResult();
+      fail("this should throw");
+    } catch (IllegalStateException e) {
+      assertEquals("getResult() called on a ValueInspector whose result is not yet known!", e.getMessage());
+    }
+
+    // update state to known
+    v.update(10);
+
+    // v was updated with value 10, so result is known and should be true
+    assertTrue(v.isKnown());
+    assertTrue(v.getResult());
+
+    // calling update w/o resetting should throw
+    try {
+      v.update(11);
+      fail("this should throw");
+    } catch (IllegalStateException e) {
+      assertEquals("setResult() called on a ValueInspector whose result is already known!"
+          + " Did you forget to call reset()?", e.getMessage());
+    }
+
+    // back to unknown state
+    v.reset();
+
+    assertFalse(v.isKnown());
+    // calling getResult in unknown state throws
+    try {
+      v.getResult();
+      fail("this should throw");
+    } catch (IllegalStateException e) {
+      assertEquals("getResult() called on a ValueInspector whose result is not yet known!", e.getMessage());
+    }
+
+    // v was updated with value 11, so result is known and should be false
+    v.update(11);
+    assertTrue(v.isKnown());
+    assertFalse(v.getResult());
+
+  }
+
+  @Test
+  public void testReusable() {
+    List<Integer> values = Arrays.asList(2, 4, 7, 3, 8, 8, 11, 200);
+    ValueInspector v = intIsEven();
+
+    for (Integer x : values) {
+      v.update(x);
+      assertEquals(x % 2 == 0, v.getResult());
+      v.reset();
+    }
+
+  }
+}