You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2014/07/29 23:39:18 UTC

[4/4] git commit: Add a unified and optionally more constrained API for expressing filters on columns

Add a unified and optionally more constrained API for expressing filters on columns

This is a re-opened version of:
https://github.com/Parquet/parquet-mr/pull/412

The idea behind this pull request is to add a way to express filters on columns using DSL that allows parquet visibility into what is being filtered and how. This visibility will allow us to make optimizations at read time, the biggest one being filtering entire row groups or pages of records without even reading them based on the statistics / metadata that is stored along with each row group or page.

Included in this api are interfaces for user defined predicates, which must operate at the value level by may opt in to operating at the row group / page level as well. This should make this new API a superset of the `parquet.filter` package. This new api will need to be reconciled with the column filters currently in the `parquet.filter` package, but I wanted to get feedback on this first.

A limitation in both this api and the old one is that you can't do cross-column filters, eg: columX > columnY.

Author: Alex Levenson <al...@twitter.com>

Closes #4 from isnotinvain/alexlevenson/filter-api and squashes the following commits:

c1ab7e3 [Alex Levenson] Address feedback
c1bd610 [Alex Levenson] cleanup dotString in ColumnPath
418bfc1 [Alex Levenson] Update version, add temporary hacks for semantic enforcer
6643bd3 [Alex Levenson] Fix some more non backward incompatible changes
39f977f [Alex Levenson] Put a bunch of backwards compatible stuff back in, add @Deprecated
13a02c6 [Alex Levenson] Fix compile errors, add back in overloaded getRecordReader
f82edb7 [Alex Levenson] Merge branch 'master' into alexlevenson/filter-api
9bd014f [Alex Levenson] clean up TODOs and reference jiras
4cc7e87 [Alex Levenson] Add some comments
30e3d61 [Alex Levenson] Create a common interface for both kinds of filters
ac153a6 [Alex Levenson] Create a Statistics class for use in UDPs
fbbf601 [Alex Levenson] refactor IncrementallyUpdatedFilterPredicateGenerator to only generate the parts that require generation
5df47cd [Alex Levenson] Static imports of checkNotNull
c1d1823 [Alex Levenson] address some of the minor feedback items
67a3ba0 [Alex Levenson] update binary's toString
3d7372b [Alex Levenson] minor fixes
fed9531 [Alex Levenson] Add skipCurrentRecord method to clear events in thrift converter
2e632d5 [Alex Levenson] Make Binary Serializable
09c024f [Alex Levenson] update comments
3169849 [Alex Levenson] fix compilation error
0185030 [Alex Levenson] Add integration test for value level filters
4fde18c [Alex Levenson] move to right package
ae36b37 [Alex Levenson] Handle merge issues
af69486 [Alex Levenson] Merge branch 'master' into alexlevenson/filter-api
0665271 [Alex Levenson] Add tests for value inspector
c5e3b07 [Alex Levenson] Add tests for resetter and evaluator
29f677a [Alex Levenson] Fix scala DSL
8897a28 [Alex Levenson] Fix some tests
b448bee [Alex Levenson] Fix mistake in MessageColumnIO
c8133f8 [Alex Levenson] Fix some tests
4cf686d [Alex Levenson] more null checks
69e683b [Alex Levenson] check all the nulls
220a682 [Alex Levenson] more cleanup
aad5af3 [Alex Levenson] rm generated src file from git
5075243 [Alex Levenson] more minor cleanup
9966713 [Alex Levenson] Hook generation into maven build
8282725 [Alex Levenson] minor cleanup
fea3ea9 [Alex Levenson] minor cleanup
9e35406 [Alex Levenson] move statistics filter
c52750c [Alex Levenson] finish moving things around
97a6bfd [Alex Levenson] Move things around pt2
843b9fe [Alex Levenson] Move some files around pt 1
5eedcc0 [Alex Levenson] turn off dictionary support for AtomicConverter
541319e [Alex Levenson] various cleanup and fixes
08e9638 [Alex Levenson] rm ColumnPathUtil
bfe6795 [Alex Levenson] Add type bounds to FilterApi
6c831ab [Alex Levenson] don't double log exception in SerializationUtil
a7a58d1 [Alex Levenson] use ColumnPath instead of String
8f11a6b [Alex Levenson] Move ColumnPath and Canonicalizer to parquet-common
9164359 [Alex Levenson] stash
abc2be2 [Alex Levenson] Add null handling to record filters -- this impl is still broken though
90ba8f7 [Alex Levenson] Update Serialization Util
0a261f1 [Alex Levenson] Add compression in SerializationUtil
f1278be [Alex Levenson] Add comment, fix tests
cbd1a85 [Alex Levenson] Replace some specialization with generic views
e496cbf [Alex Levenson] Fix short circuiting in StatisticsFilter
db6b32d [Alex Levenson] Address some comments, fix constructor in ParquetReader
fd6f44d [Alex Levenson] Fix semver backward compat
2fdd304 [Alex Levenson] Some more cleanup
d34fb89 [Alex Levenson] Cleanup some TODOs
544499c [Alex Levenson] stash
7b32016 [Alex Levenson] Merge branch 'master' into alexlevenson/filter-api
0e31251 [Alex Levenson] First pass at values filter, needs reworking
470e409 [Alex Levenson] fix java6/7 bug, minor cleanup
ee7b221 [Alex Levenson] more InputFormat tests
5ef849e [Alex Levenson] Add guards for not specifying both kinds of filter
0186b1f [Alex Levenson] Add logging to ParquetInputFormat and tests for configuration
a622648 [Alex Levenson] cleanup imports
9b1ea88 [Alex Levenson] Add tests for statistics filter
d517373 [Alex Levenson] tests for filter validator
b25fc44 [Alex Levenson] small cleanup of filter validator
32067a1 [Alex Levenson] add test for collapse logical nots
1efc198 [Alex Levenson] Add tests for invert filter predicate
046b106 [Alex Levenson] some more fixes
d3c4d7a [Alex Levenson] fix some more types, add in test for SerializationUtil
cc51274 [Alex Levenson] fix generics in FilterPredicateInverter
ea08349 [Alex Levenson] First pass at rowgroup filter, needs testing
156d91b [Alex Levenson] Add runtime type checker
4dfb4f2 [Alex Levenson] Add serialization util
8f80b20 [Alex Levenson] update comment
7c25121 [Alex Levenson] Add class to Column struct
58f1190 [Alex Levenson] Remove filterByUniqueValues
7f20de6 [Alex Levenson] rename user predicates
af14b42 [Alex Levenson] Update dsl
04409c5 [Alex Levenson] Add generic types into Visitor
ba42884 [Alex Levenson] rm getClassName
65f8af9 [Alex Levenson] Add in support for user defined predicates on columns
6926337 [Alex Levenson] Add explicit tokens for notEq, ltEq, gtEq
667ec9f [Alex Levenson] remove test for collapsing double negation
db2f71a [Alex Levenson] rename FilterPredicatesTest
a0a0533 [Alex Levenson] Address first round of comments
b2bca94 [Alex Levenson] Add scala DSL and tests
bedda87 [Alex Levenson] Add tests for FilterPredicate building
238cbbe [Alex Levenson] Add scala dsl
39f7b24 [Alex Levenson] add scala mvn boilerplate
2ec71a7 [Alex Levenson] Add predicate API


Project: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/commit/ad32bf0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/tree/ad32bf0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/diff/ad32bf0f

Branch: refs/heads/master
Commit: ad32bf0fd111ab473ad1080cde11de39e3c5a67f
Parents: fc2c29d
Author: Alex Levenson <al...@twitter.com>
Authored: Tue Jul 29 14:38:59 2014 -0700
Committer: julien <ju...@twitter.com>
Committed: Tue Jul 29 14:38:59 2014 -0700

----------------------------------------------------------------------
 parquet-avro/pom.xml                            |   2 +-
 .../java/parquet/avro/AvroParquetReader.java    |  35 +-
 parquet-cascading/pom.xml                       |   2 +-
 parquet-column/pom.xml                          |  22 +-
 .../column/statistics/BinaryStatistics.java     |  12 +-
 .../column/statistics/BooleanStatistics.java    |  12 +-
 .../column/statistics/DoubleStatistics.java     |  12 +-
 .../column/statistics/FloatStatistics.java      |  12 +-
 .../column/statistics/IntStatistics.java        |  12 +-
 .../column/statistics/LongStatistics.java       |  12 +-
 .../parquet/column/statistics/Statistics.java   |   5 +-
 .../parquet/filter2/compat/FilterCompat.java    | 140 ++++++
 .../parquet/filter2/predicate/FilterApi.java    | 177 +++++++
 .../filter2/predicate/FilterPredicate.java      |  54 +++
 .../predicate/LogicalInverseRewriter.java       |  95 ++++
 .../filter2/predicate/LogicalInverter.java      |  90 ++++
 .../parquet/filter2/predicate/Operators.java    | 455 +++++++++++++++++
 .../predicate/SchemaCompatibilityValidator.java | 172 +++++++
 .../parquet/filter2/predicate/Statistics.java   |  24 +
 .../filter2/predicate/UserDefinedPredicate.java |  90 ++++
 .../parquet/filter2/predicate/ValidTypeMap.java | 160 ++++++
 .../recordlevel/FilteringGroupConverter.java    |  97 ++++
 .../FilteringPrimitiveConverter.java            |  91 ++++
 .../FilteringRecordMaterializer.java            |  97 ++++
 .../IncrementallyUpdatedFilterPredicate.java    | 139 ++++++
 ...ntallyUpdatedFilterPredicateBuilderBase.java |  79 +++
 ...mentallyUpdatedFilterPredicateEvaluator.java |  45 ++
 ...ementallyUpdatedFilterPredicateResetter.java |  42 ++
 .../java/parquet/io/FilteredRecordReader.java   |   6 +
 .../main/java/parquet/io/MessageColumnIO.java   | 100 +++-
 .../src/main/java/parquet/io/RecordReader.java  |   9 +-
 .../parquet/io/RecordReaderImplementation.java  |  14 +-
 .../src/main/java/parquet/io/api/Binary.java    | 486 +++++++++++--------
 .../java/parquet/io/api/RecordMaterializer.java |   5 +
 .../parquet/filter2/predicate/DummyUdp.java     |  19 +
 .../filter2/predicate/TestFilterApiMethods.java | 103 ++++
 .../predicate/TestLogicalInverseRewriter.java   |  85 ++++
 .../filter2/predicate/TestLogicalInverter.java  |  76 +++
 .../TestSchemaCompatibilityValidator.java       | 124 +++++
 .../filter2/predicate/TestValidTypeMap.java     |  93 ++++
 ...mentallyUpdatedFilterPredicateEvaluator.java | 191 ++++++++
 ...ementallyUpdatedFilterPredicateResetter.java |  51 ++
 .../filter2/recordlevel/TestValueInspector.java |  79 +++
 .../src/test/java/parquet/io/TestFiltered.java  |  58 +--
 parquet-common/pom.xml                          |   2 +-
 .../src/main/java/parquet/Closeables.java       |  37 ++
 .../parquet/common/internal/Canonicalizer.java  |  59 +++
 .../java/parquet/common/schema/ColumnPath.java  |  96 ++++
 parquet-encoding/pom.xml                        |   2 +-
 parquet-generator/pom.xml                       |   2 +-
 .../main/java/parquet/encoding/Generator.java   |   2 +-
 .../main/java/parquet/filter2/Generator.java    |  10 +
 ...mentallyUpdatedFilterPredicateGenerator.java | 251 ++++++++++
 parquet-hadoop-bundle/pom.xml                   |   2 +-
 parquet-hadoop/pom.xml                          |   4 +-
 .../parquet/filter2/compat/RowGroupFilter.java  |  63 +++
 .../statisticslevel/StatisticsFilter.java       | 244 ++++++++++
 .../converter/ParquetMetadataConverter.java     |  12 +-
 .../hadoop/InternalParquetRecordReader.java     |  59 ++-
 .../java/parquet/hadoop/ParquetFileReader.java  |  12 +-
 .../java/parquet/hadoop/ParquetFileWriter.java  |   8 +-
 .../java/parquet/hadoop/ParquetInputFormat.java | 118 ++++-
 .../java/parquet/hadoop/ParquetInputSplit.java  |   2 +-
 .../main/java/parquet/hadoop/ParquetReader.java |  96 +++-
 .../parquet/hadoop/ParquetRecordReader.java     |  24 +-
 .../main/java/parquet/hadoop/ParquetWriter.java |  13 +
 .../mapred/DeprecatedParquetInputFormat.java    |  13 +-
 .../parquet/hadoop/metadata/Canonicalizer.java  |  59 ---
 .../hadoop/metadata/ColumnChunkMetaData.java    |   3 +-
 .../hadoop/metadata/ColumnChunkProperties.java  |   2 +
 .../parquet/hadoop/metadata/ColumnPath.java     |  73 ---
 .../parquet/hadoop/metadata/EncodingList.java   |   1 +
 .../parquet/hadoop/util/SerializationUtil.java  |  93 ++++
 .../filter2/compat/TestRowGroupFilter.java      |  84 ++++
 .../filter2/recordlevel/PhoneBookWriter.java    | 251 ++++++++++
 .../recordlevel/TestRecordLevelFilters.java     | 205 ++++++++
 .../statisticslevel/TestStatisticsFilter.java   | 307 ++++++++++++
 .../java/parquet/hadoop/TestInputFormat.java    | 110 ++++-
 .../metadata/TestColumnChunkMetaData.java       |   9 +-
 .../hadoop/util/TestSerializationUtil.java      |  53 ++
 parquet-hive-bundle/pom.xml                     |   2 +-
 .../parquet-hive-0.10-binding/pom.xml           |   2 +-
 .../parquet-hive-0.12-binding/pom.xml           |   2 +-
 .../parquet-hive-binding-bundle/pom.xml         |   2 +-
 .../parquet-hive-binding-factory/pom.xml        |   2 +-
 .../parquet-hive-binding-interface/pom.xml      |   2 +-
 parquet-hive/parquet-hive-binding/pom.xml       |   2 +-
 .../parquet-hive-storage-handler/pom.xml        |   2 +-
 parquet-hive/pom.xml                            |   2 +-
 parquet-jackson/pom.xml                         |   2 +-
 parquet-pig-bundle/pom.xml                      |   2 +-
 parquet-pig/pom.xml                             |   2 +-
 parquet-protobuf/pom.xml                        |   2 +-
 .../java/parquet/proto/ProtoParquetReader.java  |  23 +-
 parquet-scala/pom.xml                           |  67 +++
 .../main/scala/parquet/filter2/dsl/Dsl.scala    |  89 ++++
 .../scala/parquet/filter2/dsl/DslTest.scala     |  61 +++
 parquet-scrooge/pom.xml                         |   4 +-
 parquet-test-hadoop2/pom.xml                    |   2 +-
 parquet-thrift/pom.xml                          |   2 +-
 .../parquet/thrift/ThriftParquetReader.java     |  64 ++-
 .../parquet/thrift/ThriftRecordConverter.java   |   5 +
 parquet-tools/pom.xml                           |   2 +-
 .../java/parquet/tools/command/DumpCommand.java |   1 -
 pom.xml                                         |  14 +-
 105 files changed, 5871 insertions(+), 554 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-avro/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index 16f34f7..9556a60 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java
index f002f21..54cfc8b 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroParquetReader.java
@@ -17,34 +17,51 @@ package parquet.avro;
 
 import java.io.IOException;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
 import parquet.filter.UnboundRecordFilter;
 import parquet.hadoop.ParquetReader;
-import parquet.hadoop.api.ReadSupport;
 
 /**
  * Read Avro records from a Parquet file.
  */
 public class AvroParquetReader<T extends IndexedRecord> extends ParquetReader<T> {
 
+  public Builder<T> builder(Path file) {
+    return ParquetReader.builder(new AvroReadSupport<T>(), file);
+  }
+
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
   public AvroParquetReader(Path file) throws IOException {
-    super(file, (ReadSupport<T>) new AvroReadSupport());
+    super(file, new AvroReadSupport<T>());
   }
 
-  public AvroParquetReader(Path file, UnboundRecordFilter recordFilter) throws IOException {
-    super(file, (ReadSupport<T>) new AvroReadSupport(), recordFilter);
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
+  public AvroParquetReader(Path file, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    super(file, new AvroReadSupport<T>(), unboundRecordFilter);
   }
 
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
   public AvroParquetReader(Configuration conf, Path file) throws IOException {
-    super(conf, file, (ReadSupport<T>) new AvroReadSupport());
+    super(conf, file, new AvroReadSupport<T>());
   }
 
-  public AvroParquetReader(Configuration conf, Path file, UnboundRecordFilter recordFilter ) throws IOException {
-    super(conf, file, (ReadSupport<T>) new AvroReadSupport(), recordFilter);
+  /**
+   * @deprecated use {@link #builder(Path)}
+   */
+  @Deprecated
+  public AvroParquetReader(Configuration conf, Path file, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    super(conf, file, new AvroReadSupport<T>(), unboundRecordFilter);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-cascading/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml
index 6b2bd45..ad209af 100644
--- a/parquet-cascading/pom.xml
+++ b/parquet-cascading/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index 10f469d..94473f6 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
@@ -96,6 +96,26 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2.1</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>java</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <mainClass>parquet.filter2.Generator</mainClass>          
+          <arguments>
+            <argument>${basedir}/target/generated-src</argument>
+          </arguments>
+          <sourceRoot>${basedir}/target/generated-src</sourceRoot>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java
index 8072439..f125b2f 100644
--- a/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java
+++ b/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java
@@ -17,7 +17,7 @@ package parquet.column.statistics;
 
 import parquet.io.api.Binary;
 
-public class BinaryStatistics extends Statistics{
+public class BinaryStatistics extends Statistics<Binary> {
 
   private Binary max;
   private Binary min;
@@ -77,6 +77,16 @@ public class BinaryStatistics extends Statistics{
       this.markAsNotEmpty();
   }
 
+  @Override
+  public Binary genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Binary genericGetMax() {
+    return max;
+  }
+
   public Binary getMax() {
     return max;
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/column/statistics/BooleanStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/statistics/BooleanStatistics.java b/parquet-column/src/main/java/parquet/column/statistics/BooleanStatistics.java
index 4552fd4..6741343 100644
--- a/parquet-column/src/main/java/parquet/column/statistics/BooleanStatistics.java
+++ b/parquet-column/src/main/java/parquet/column/statistics/BooleanStatistics.java
@@ -17,7 +17,7 @@ package parquet.column.statistics;
 
 import parquet.bytes.BytesUtils;
 
-public class BooleanStatistics extends Statistics{
+public class BooleanStatistics extends Statistics<Boolean> {
 
   private boolean max;
   private boolean min;
@@ -77,6 +77,16 @@ public class BooleanStatistics extends Statistics{
       this.markAsNotEmpty();
   }
 
+  @Override
+  public Boolean genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Boolean genericGetMax() {
+    return max;
+  }
+
   public boolean getMax() {
     return max;
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/column/statistics/DoubleStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/statistics/DoubleStatistics.java b/parquet-column/src/main/java/parquet/column/statistics/DoubleStatistics.java
index 5dfe161..c9695f3 100644
--- a/parquet-column/src/main/java/parquet/column/statistics/DoubleStatistics.java
+++ b/parquet-column/src/main/java/parquet/column/statistics/DoubleStatistics.java
@@ -17,7 +17,7 @@ package parquet.column.statistics;
 
 import parquet.bytes.BytesUtils;
 
-public class DoubleStatistics extends Statistics{
+public class DoubleStatistics extends Statistics<Double> {
 
   private double max;
   private double min;
@@ -77,6 +77,16 @@ public class DoubleStatistics extends Statistics{
       this.markAsNotEmpty();
   }
 
+  @Override
+  public Double genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Double genericGetMax() {
+    return max;
+  }
+
   public double getMax() {
     return max;
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/column/statistics/FloatStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/statistics/FloatStatistics.java b/parquet-column/src/main/java/parquet/column/statistics/FloatStatistics.java
index 1e85839..b13aafa 100644
--- a/parquet-column/src/main/java/parquet/column/statistics/FloatStatistics.java
+++ b/parquet-column/src/main/java/parquet/column/statistics/FloatStatistics.java
@@ -17,7 +17,7 @@ package parquet.column.statistics;
 
 import parquet.bytes.BytesUtils;
 
-public class FloatStatistics extends Statistics{
+public class FloatStatistics extends Statistics<Float> {
 
   private float max;
   private float min;
@@ -77,6 +77,16 @@ public class FloatStatistics extends Statistics{
       this.markAsNotEmpty();
   }
 
+  @Override
+  public Float genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Float genericGetMax() {
+    return max;
+  }
+
   public float getMax() {
     return max;
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/column/statistics/IntStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/statistics/IntStatistics.java b/parquet-column/src/main/java/parquet/column/statistics/IntStatistics.java
index 5871553..7bdd6be 100644
--- a/parquet-column/src/main/java/parquet/column/statistics/IntStatistics.java
+++ b/parquet-column/src/main/java/parquet/column/statistics/IntStatistics.java
@@ -17,7 +17,7 @@ package parquet.column.statistics;
 
 import parquet.bytes.BytesUtils;
 
-public class IntStatistics extends Statistics{
+public class IntStatistics extends Statistics<Integer> {
 
   private int max;
   private int min;
@@ -77,6 +77,16 @@ public class IntStatistics extends Statistics{
       this.markAsNotEmpty();
   }
 
+  @Override
+  public Integer genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Integer genericGetMax() {
+    return max;
+  }
+
   public int getMax() {
     return max;
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/column/statistics/LongStatistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/statistics/LongStatistics.java b/parquet-column/src/main/java/parquet/column/statistics/LongStatistics.java
index c1cf94e..bae63a9 100644
--- a/parquet-column/src/main/java/parquet/column/statistics/LongStatistics.java
+++ b/parquet-column/src/main/java/parquet/column/statistics/LongStatistics.java
@@ -17,7 +17,7 @@ package parquet.column.statistics;
 
 import parquet.bytes.BytesUtils;
 
-public class LongStatistics extends Statistics{
+public class LongStatistics extends Statistics<Long> {
 
   private long max;
   private long min;
@@ -77,6 +77,16 @@ public class LongStatistics extends Statistics{
       this.markAsNotEmpty();
   }
 
+  @Override
+  public Long genericGetMin() {
+    return min;
+  }
+
+  @Override
+  public Long genericGetMax() {
+    return max;
+  }
+
   public long getMax() {
     return max;
   }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/column/statistics/Statistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/column/statistics/Statistics.java b/parquet-column/src/main/java/parquet/column/statistics/Statistics.java
index 2c5ac14..b29b76b 100644
--- a/parquet-column/src/main/java/parquet/column/statistics/Statistics.java
+++ b/parquet-column/src/main/java/parquet/column/statistics/Statistics.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
  *
  * @author Katya Gonina
  */
-public abstract class Statistics {
+public abstract class Statistics<T extends Comparable<T>> {
 
   private boolean firstValueAccountedFor;
   private long num_nulls;
@@ -162,6 +162,9 @@ public abstract class Statistics {
    */
   abstract public void setMinMaxFromBytes(byte[] minBytes, byte[] maxBytes);
 
+  abstract public T genericGetMin();
+  abstract public T genericGetMax();
+
   /**
    * Abstract method to return the max value as a byte array
    * @return byte array corresponding to the max value

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/compat/FilterCompat.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/compat/FilterCompat.java b/parquet-column/src/main/java/parquet/filter2/compat/FilterCompat.java
new file mode 100644
index 0000000..826bd52
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/compat/FilterCompat.java
@@ -0,0 +1,140 @@
+package parquet.filter2.compat;
+
+import parquet.Log;
+import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.LogicalInverseRewriter;
+
+import static parquet.Preconditions.checkArgument;
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Parquet currently has two ways to specify a filter for dropping records at read time.
+ * The first way, that only supports filtering records during record assembly, is found
+ * in {@link parquet.filter}. The new API (found in {@link parquet.filter2}) supports
+ * also filtering entire rowgroups of records without reading them at all.
+ *
+ * This class defines a common interface that both of these filters share,
+ * {@link Filter}. A Filter can be either an {@link UnboundRecordFilter} from the old API, or
+ * a {@link FilterPredicate} from the new API, or a sentinel no-op filter.
+ *
+ * Having this common interface simplifies passing a filter through the read path of parquet's
+ * codebase.
+ */
+public class FilterCompat {
+  private static final Log LOG = Log.getLog(FilterCompat.class);
+
+  /**
+   * Anyone wanting to use a {@link Filter} need only implement this interface,
+   * per the visitor pattern.
+   */
+  public static interface Visitor<T> {
+    T visit(FilterPredicateCompat filterPredicateCompat);
+    T visit(UnboundRecordFilterCompat unboundRecordFilterCompat);
+    T visit(NoOpFilter noOpFilter);
+  }
+
+  public static interface Filter {
+    <R> R accept(Visitor<R> visitor);
+  }
+
+  // sentinel no op filter that signals "do no filtering"
+  public static final Filter NOOP = new NoOpFilter();
+
+  /**
+   * Given a FilterPredicate, return a Filter that wraps it.
+   * This method also logs the filter being used and rewrites
+   * the predicate to not include the not() operator.
+   */
+  public static Filter get(FilterPredicate filterPredicate) {
+    checkNotNull(filterPredicate, "filterPredicate");
+
+    LOG.info("Filtering using predicate: " + filterPredicate);
+
+    // rewrite the predicate to not include the not() operator
+    FilterPredicate collapsedPredicate = LogicalInverseRewriter.rewrite(filterPredicate);
+
+    if (!filterPredicate.equals(collapsedPredicate)) {
+      LOG.info("Predicate has been collapsed to: " + collapsedPredicate);
+    }
+
+    return new FilterPredicateCompat(collapsedPredicate);
+  }
+
+  /**
+   * Given an UnboundRecordFilter, return a Filter that wraps it.
+   */
+  public static Filter get(UnboundRecordFilter unboundRecordFilter) {
+    return new UnboundRecordFilterCompat(unboundRecordFilter);
+  }
+
+  /**
+   * Given either a FilterPredicate or the class of an UnboundRecordFilter, or neither (but not both)
+   * return a Filter that wraps whichever was provided.
+   *
+   * Either filterPredicate or unboundRecordFilterClass must be null, or an exception is thrown.
+   *
+   * If both are null, the no op filter will be returned.
+   */
+  public static Filter get(FilterPredicate filterPredicate, UnboundRecordFilter unboundRecordFilter) {
+    checkArgument(filterPredicate == null || unboundRecordFilter == null,
+        "Cannot provide both a FilterPredicate and an UnboundRecordFilter");
+
+    if (filterPredicate != null) {
+      return get(filterPredicate);
+    }
+
+    if (unboundRecordFilter != null) {
+      return get(unboundRecordFilter);
+    }
+
+    return NOOP;
+  }
+
+  // wraps a FilterPredicate
+  public static final class FilterPredicateCompat implements Filter {
+    private final FilterPredicate filterPredicate;
+
+    private FilterPredicateCompat(FilterPredicate filterPredicate) {
+      this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate");
+    }
+
+    public FilterPredicate getFilterPredicate() {
+      return filterPredicate;
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  // wraps an UnboundRecordFilter
+  public static final class UnboundRecordFilterCompat implements Filter {
+    private final UnboundRecordFilter unboundRecordFilter;
+
+    private UnboundRecordFilterCompat(UnboundRecordFilter unboundRecordFilter) {
+      this.unboundRecordFilter = checkNotNull(unboundRecordFilter, "unboundRecordFilter");
+    }
+
+    public UnboundRecordFilter getUnboundRecordFilter() {
+      return unboundRecordFilter;
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  // sentinel no op filter
+  public static final class NoOpFilter implements Filter {
+    private NoOpFilter() {}
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/predicate/FilterApi.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/FilterApi.java b/parquet-column/src/main/java/parquet/filter2/predicate/FilterApi.java
new file mode 100644
index 0000000..1dd2bbc
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/predicate/FilterApi.java
@@ -0,0 +1,177 @@
+package parquet.filter2.predicate;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.BinaryColumn;
+import parquet.filter2.predicate.Operators.BooleanColumn;
+import parquet.filter2.predicate.Operators.Column;
+import parquet.filter2.predicate.Operators.DoubleColumn;
+import parquet.filter2.predicate.Operators.Eq;
+import parquet.filter2.predicate.Operators.FloatColumn;
+import parquet.filter2.predicate.Operators.Gt;
+import parquet.filter2.predicate.Operators.GtEq;
+import parquet.filter2.predicate.Operators.IntColumn;
+import parquet.filter2.predicate.Operators.LongColumn;
+import parquet.filter2.predicate.Operators.Lt;
+import parquet.filter2.predicate.Operators.LtEq;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.NotEq;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.predicate.Operators.SupportsEqNotEq;
+import parquet.filter2.predicate.Operators.SupportsLtGt;
+import parquet.filter2.predicate.Operators.UserDefined;
+
+/**
+ * The Filter API is expressed through these static methods.
+ *
+ * Example usage:
+ * {@code
+ *
+ *   IntColumn foo = intColumn("foo");
+ *   DoubleColumn bar = doubleColumn("x.y.bar");
+ *
+ *   // foo == 10 || bar <= 17.0
+ *   FilterPredicate pred = or(eq(foo, 10), ltEq(bar, 17.0));
+ *
+ * }
+ */
+// TODO: Support repeated columns (https://issues.apache.org/jira/browse/PARQUET-34)
+//
+// TODO: Support filtering on groups (eg, filter where this group is / isn't null)
+// TODO: (https://issues.apache.org/jira/browse/PARQUET-43)
+
+// TODO: Consider adding support for more column types that aren't coupled with parquet types, eg Column<String>
+// TODO: (https://issues.apache.org/jira/browse/PARQUET-35)
+public final class FilterApi {
+  private FilterApi() { }
+
+  public static IntColumn intColumn(String columnPath) {
+    return new IntColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static LongColumn longColumn(String columnPath) {
+    return new LongColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static FloatColumn floatColumn(String columnPath) {
+    return new FloatColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static DoubleColumn doubleColumn(String columnPath) {
+    return new DoubleColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static BooleanColumn booleanColumn(String columnPath) {
+    return new BooleanColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  public static BinaryColumn binaryColumn(String columnPath) {
+    return new BinaryColumn(ColumnPath.fromDotString(columnPath));
+  }
+
+  /**
+   * Keeps records if their value is equal to the provided value.
+   * Nulls are treated the same way the java programming language does.
+   * For example:
+   *   eq(column, null) will keep all records whose value is null.
+   *   eq(column, 7) will keep all records whose value is 7, and will drop records whose value is null
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsEqNotEq> Eq<T> eq(C column, T value) {
+    return new Eq<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is not equal to the provided value.
+   * Nulls are treated the same way the java programming language does.
+   * For example:
+   *   notEq(column, null) will keep all records whose value is not null.
+   *   notEq(column, 7) will keep all records whose value is not 7, including records whose value is null.
+   *
+   *   NOTE: this is different from how some query languages handle null. For example, SQL and pig will drop
+   *   nulls when you filter by not equal to 7. To achieve similar behavior in this api, do:
+   *   and(notEq(column, 7), notEq(column, null))
+   *
+   *   NOTE: be sure to read the {@link #lt}, {@link #ltEq}, {@link #gt}, {@link #gtEq} operator's docs
+   *         for how they handle nulls
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsEqNotEq> NotEq<T> notEq(C column, T value) {
+    return new NotEq<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is less than (but not equal to) the provided value.
+   * The provided value cannot be null, as less than null has no meaning.
+   * Records with null values will be dropped.
+   * For example:
+   *   lt(column, 7) will keep all records whose value is less than (but not equal to) 7, and not null.
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> Lt<T> lt(C column, T value) {
+    return new Lt<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is less than or equal to the provided value.
+   * The provided value cannot be null, as less than null has no meaning.
+   * Records with null values will be dropped.
+   * For example:
+   *   ltEq(column, 7) will keep all records whose value is less than or equal to 7, and not null.
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> LtEq<T> ltEq(C column, T value) {
+    return new LtEq<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is greater than (but not equal to) the provided value.
+   * The provided value cannot be null, as less than null has no meaning.
+   * Records with null values will be dropped.
+   * For example:
+   *   gt(column, 7) will keep all records whose value is greater than (but not equal to) 7, and not null.
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> Gt<T> gt(C column, T value) {
+    return new Gt<T>(column, value);
+  }
+
+  /**
+   * Keeps records if their value is greater than or equal to the provided value.
+   * The provided value cannot be null, as less than null has no meaning.
+   * Records with null values will be dropped.
+   * For example:
+   *   gtEq(column, 7) will keep all records whose value is greater than or equal to 7, and not null.
+   */
+  public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> GtEq<T> gtEq(C column, T value) {
+    return new GtEq<T>(column, value);
+  }
+
+  /**
+   * Keeps records that pass the provided {@link UserDefinedPredicate}
+   */
+  public static <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
+    UserDefined<T, U> userDefined(Column<T> column, Class<U> clazz) {
+    return new UserDefined<T, U>(column, clazz);
+  }
+
+  /**
+   * Constructs the logical and of two predicates. Records will be kept if both the left and right predicate agree
+   * that the record should be kept.
+   */
+  public static FilterPredicate and(FilterPredicate left, FilterPredicate right) {
+    return new And(left, right);
+  }
+
+  /**
+   * Constructs the logical or of two predicates. Records will be kept if either the left or right predicate
+   * is satisfied (or both).
+   */
+  public static FilterPredicate or(FilterPredicate left, FilterPredicate right) {
+    return new Or(left, right);
+  }
+
+  /**
+   * Constructs the logical not (or inverse) of a predicate.
+   * Records will be kept if the provided predicate is not satisfied.
+   */
+  public static FilterPredicate not(FilterPredicate predicate) {
+    return new Not(predicate);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/predicate/FilterPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/FilterPredicate.java b/parquet-column/src/main/java/parquet/filter2/predicate/FilterPredicate.java
new file mode 100644
index 0000000..9cdaabe
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/predicate/FilterPredicate.java
@@ -0,0 +1,54 @@
+package parquet.filter2.predicate;
+
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.Eq;
+import parquet.filter2.predicate.Operators.Gt;
+import parquet.filter2.predicate.Operators.GtEq;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.Lt;
+import parquet.filter2.predicate.Operators.LtEq;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.NotEq;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.predicate.Operators.UserDefined;
+
+/**
+ * A FilterPredicate is an expression tree describing the criteria for which records to keep when loading data from
+ * a parquet file. These predicates are applied in multiple places. Currently, they are applied to all row groups at
+ * job submission time to see if we can potentially drop entire row groups, and then they are applied during column
+ * assembly to drop individual records that are not wanted.
+ *
+ * FilterPredicates do not contain closures or instances of anonymous classes, rather they are expressed as
+ * an expression tree of operators.
+ *
+ * FilterPredicates are implemented in terms of the visitor pattern.
+ *
+ * See {@link Operators} for the implementation of the operator tokens,
+ * and {@link FilterApi} for the dsl functions for constructing an expression tree.
+ */
+public interface FilterPredicate {
+
+  /**
+   * A FilterPredicate must accept a Visitor, per the visitor pattern.
+   */
+  <R> R accept(Visitor<R> visitor);
+
+  /**
+   * A FilterPredicate Visitor must visit all the operators in a FilterPredicate expression tree,
+   * and must handle recursion itself, per the visitor pattern.
+   */
+  public static interface Visitor<R> {
+    <T extends Comparable<T>> R visit(Eq<T> eq);
+    <T extends Comparable<T>> R visit(NotEq<T> notEq);
+    <T extends Comparable<T>> R visit(Lt<T> lt);
+    <T extends Comparable<T>> R visit(LtEq<T> ltEq);
+    <T extends Comparable<T>> R visit(Gt<T> gt);
+    <T extends Comparable<T>> R visit(GtEq<T> gtEq);
+    R visit(And and);
+    R visit(Or or);
+    R visit(Not not);
+    <T extends Comparable<T>, U extends UserDefinedPredicate<T>> R visit(UserDefined<T, U> udp);
+    <T extends Comparable<T>, U extends UserDefinedPredicate<T>> R visit(LogicalNotUserDefined<T, U> udp);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverseRewriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverseRewriter.java b/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverseRewriter.java
new file mode 100644
index 0000000..4151bff
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverseRewriter.java
@@ -0,0 +1,95 @@
+package parquet.filter2.predicate;
+
+import parquet.filter2.predicate.FilterPredicate.Visitor;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.Eq;
+import parquet.filter2.predicate.Operators.Gt;
+import parquet.filter2.predicate.Operators.GtEq;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.Lt;
+import parquet.filter2.predicate.Operators.LtEq;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.NotEq;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.predicate.Operators.UserDefined;
+
+import static parquet.Preconditions.checkNotNull;
+import static parquet.filter2.predicate.FilterApi.and;
+import static parquet.filter2.predicate.FilterApi.or;
+
+/**
+ * Recursively removes all use of the not() operator in a predicate
+ * by replacing all instances of not(x) with the inverse(x),
+ * eg: not(and(eq(), not(eq(y))) -> or(notEq(), eq(y))
+ *
+ * The returned predicate should have the same meaning as the original, but
+ * without the use of the not() operator.
+ *
+ * See also {@link LogicalInverter}, which is used
+ * to do the inversion.
+ */
+public final class LogicalInverseRewriter implements Visitor<FilterPredicate> {
+  private static final LogicalInverseRewriter INSTANCE = new LogicalInverseRewriter();
+
+  public static FilterPredicate rewrite(FilterPredicate pred) {
+    checkNotNull(pred, "pred");
+    return pred.accept(INSTANCE);
+  }
+
+  private LogicalInverseRewriter() { }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Eq<T> eq) {
+    return eq;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(NotEq<T> notEq) {
+    return notEq;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Lt<T> lt) {
+    return lt;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(LtEq<T> ltEq) {
+    return ltEq;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Gt<T> gt) {
+    return gt;
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(GtEq<T> gtEq) {
+    return gtEq;
+  }
+
+  @Override
+  public FilterPredicate visit(And and) {
+    return and(and.getLeft().accept(this), and.getRight().accept(this));
+  }
+
+  @Override
+  public FilterPredicate visit(Or or) {
+    return or(or.getLeft().accept(this), or.getRight().accept(this));
+  }
+
+  @Override
+  public FilterPredicate visit(Not not) {
+    return LogicalInverter.invert(not.getPredicate().accept(this));
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredicate visit(UserDefined<T, U> udp) {
+    return udp;
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
+    return udp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverter.java b/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverter.java
new file mode 100644
index 0000000..15b0715
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverter.java
@@ -0,0 +1,90 @@
+package parquet.filter2.predicate;
+
+import parquet.filter2.predicate.FilterPredicate.Visitor;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.Eq;
+import parquet.filter2.predicate.Operators.Gt;
+import parquet.filter2.predicate.Operators.GtEq;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.Lt;
+import parquet.filter2.predicate.Operators.LtEq;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.NotEq;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.predicate.Operators.UserDefined;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Converts a {@link FilterPredicate} to its logical inverse.
+ * The returned predicate should be equivalent to not(p), but without
+ * the use of a not() operator.
+ *
+ * See also {@link LogicalInverseRewriter}, which can remove the use
+ * of all not() operators without inverting the overall predicate.
+ */
+public final class LogicalInverter implements Visitor<FilterPredicate> {
+  private static final LogicalInverter INSTANCE = new LogicalInverter();
+
+  public static FilterPredicate invert(FilterPredicate pred) {
+    checkNotNull(pred, "pred");
+    return pred.accept(INSTANCE);
+  }
+
+  private LogicalInverter() {}
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Eq<T> eq) {
+    return new NotEq<T>(eq.getColumn(), eq.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(NotEq<T> notEq) {
+    return new Eq<T>(notEq.getColumn(), notEq.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Lt<T> lt) {
+    return new GtEq<T>(lt.getColumn(), lt.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(LtEq<T> ltEq) {
+    return new Gt<T>(ltEq.getColumn(), ltEq.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(Gt<T> gt) {
+    return new LtEq<T>(gt.getColumn(), gt.getValue());
+  }
+
+  @Override
+  public <T extends Comparable<T>> FilterPredicate visit(GtEq<T> gtEq) {
+    return new Lt<T>(gtEq.getColumn(), gtEq.getValue());
+  }
+
+  @Override
+  public FilterPredicate visit(And and) {
+    return new Or(and.getLeft().accept(this), and.getRight().accept(this));
+  }
+
+  @Override
+  public FilterPredicate visit(Or or) {
+    return new And(or.getLeft().accept(this), or.getRight().accept(this));
+  }
+
+  @Override
+  public FilterPredicate visit(Not not) {
+    return not.getPredicate();
+  }
+
+  @Override
+  public <T extends Comparable<T>,  U extends UserDefinedPredicate<T>> FilterPredicate visit(UserDefined<T, U> udp) {
+    return new LogicalNotUserDefined<T, U>(udp);
+  }
+
+  @Override
+  public <T extends Comparable<T>,  U extends UserDefinedPredicate<T>> FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
+    return udp.getUserDefined();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java b/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java
new file mode 100644
index 0000000..5d13f8c
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/predicate/Operators.java
@@ -0,0 +1,455 @@
+package parquet.filter2.predicate;
+
+import java.io.Serializable;
+
+import parquet.common.schema.ColumnPath;
+import parquet.io.api.Binary;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * These are the operators in a filter predicate expression tree.
+ * They are constructed by using the methods in {@link FilterApi}
+ */
+public final class Operators {
+  private Operators() { }
+
+  public static abstract class Column<T extends Comparable<T>> implements Serializable {
+    private final ColumnPath columnPath;
+    private final Class<T> columnType;
+
+    protected Column(ColumnPath columnPath, Class<T> columnType) {
+      checkNotNull(columnPath, "columnPath");
+      checkNotNull(columnType, "columnType");
+      this.columnPath = columnPath;
+      this.columnType = columnType;
+    }
+
+    public Class<T> getColumnType() {
+      return columnType;
+    }
+
+    public ColumnPath getColumnPath() {
+      return columnPath;
+    }
+
+    @Override
+    public String toString() {
+      return "column(" + columnPath.toDotString() + ")";
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Column column = (Column) o;
+
+      if (!columnType.equals(column.columnType)) return false;
+      if (!columnPath.equals(column.columnPath)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = columnPath.hashCode();
+      result = 31 * result + columnType.hashCode();
+      return result;
+    }
+  }
+
+  public static interface SupportsEqNotEq { } // marker for columns that can be used with eq() and notEq()
+  public static interface SupportsLtGt extends SupportsEqNotEq { } // marker for columns that can be used with lt(), ltEq(), gt(), gtEq()
+
+  public static final class IntColumn extends Column<Integer> implements SupportsLtGt {
+    IntColumn(ColumnPath columnPath) {
+      super(columnPath, Integer.class);
+    }
+  }
+
+  public static final class LongColumn extends Column<Long> implements SupportsLtGt {
+    LongColumn(ColumnPath columnPath) {
+      super(columnPath, Long.class);
+    }
+  }
+
+  public static final class DoubleColumn extends Column<Double> implements SupportsLtGt {
+    DoubleColumn(ColumnPath columnPath) {
+      super(columnPath, Double.class);
+    }
+  }
+
+  public static final class FloatColumn extends Column<Float> implements SupportsLtGt {
+    FloatColumn(ColumnPath columnPath) {
+      super(columnPath, Float.class);
+    }
+  }
+
+  public static final class BooleanColumn extends Column<Boolean> implements SupportsEqNotEq {
+    BooleanColumn(ColumnPath columnPath) {
+      super(columnPath, Boolean.class);
+    }
+  }
+
+  public static final class BinaryColumn extends Column<Binary> implements SupportsLtGt {
+    BinaryColumn(ColumnPath columnPath) {
+      super(columnPath, Binary.class);
+    }
+  }
+
+  // base class for Eq, NotEq, Lt, Gt, LtEq, GtEq
+  static abstract class ColumnFilterPredicate<T extends Comparable<T>> implements FilterPredicate, Serializable  {
+    private final Column<T> column;
+    private final T value;
+    private final String toString;
+
+    protected ColumnFilterPredicate(Column<T> column, T value) {
+      this.column = checkNotNull(column, "column");
+
+      // Eq and NotEq allow value to be null, Lt, Gt, LtEq, GtEq however do not, so they guard against
+      // null in their own constructors.
+      this.value = value;
+
+      String name = getClass().getSimpleName().toLowerCase();
+      this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + value + ")";
+    }
+
+    public Column<T> getColumn() {
+      return column;
+    }
+
+    public T getValue() {
+      return value;
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      ColumnFilterPredicate that = (ColumnFilterPredicate) o;
+
+      if (!column.equals(that.column)) return false;
+      if (value != null ? !value.equals(that.value) : that.value != null) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = column.hashCode();
+      result = 31 * result + (value != null ? value.hashCode() : 0);
+      result = 31 * result + getClass().hashCode();
+      return result;
+    }
+  }
+
+  public static final class Eq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value can be null
+    Eq(Column<T> column, T value) {
+      super(column, value);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+
+  }
+
+  public static final class NotEq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value can be null
+    NotEq(Column<T> column, T value) {
+      super(column, value);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+
+  public static final class Lt<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value cannot be null
+    Lt(Column<T> column, T value) {
+      super(column, checkNotNull(value, "value"));
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static final class LtEq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value cannot be null
+    LtEq(Column<T> column, T value) {
+      super(column, checkNotNull(value, "value"));
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+
+  public static final class Gt<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value cannot be null
+    Gt(Column<T> column, T value) {
+      super(column, checkNotNull(value, "value"));
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static final class GtEq<T extends Comparable<T>> extends ColumnFilterPredicate<T> {
+
+    // value cannot be null
+    GtEq(Column<T> column, T value) {
+      super(column, checkNotNull(value, "value"));
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  // base class for And, Or
+  private static abstract class BinaryLogicalFilterPredicate implements FilterPredicate, Serializable {
+    private final FilterPredicate left;
+    private final FilterPredicate right;
+    private final String toString;
+
+    protected BinaryLogicalFilterPredicate(FilterPredicate left, FilterPredicate right) {
+      this.left = checkNotNull(left, "left");
+      this.right = checkNotNull(right, "right");
+      String name = getClass().getSimpleName().toLowerCase();
+      this.toString = name + "(" + left + ", " + right + ")";
+    }
+
+    public FilterPredicate getLeft() {
+      return left;
+    }
+
+    public FilterPredicate getRight() {
+      return right;
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      BinaryLogicalFilterPredicate that = (BinaryLogicalFilterPredicate) o;
+
+      if (!left.equals(that.left)) return false;
+      if (!right.equals(that.right)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = left.hashCode();
+      result = 31 * result + right.hashCode();
+      result = 31 * result + getClass().hashCode();
+      return result;
+    }
+  }
+
+  public static final class And extends BinaryLogicalFilterPredicate {
+
+    And(FilterPredicate left, FilterPredicate right) {
+      super(left, right);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static final class Or extends BinaryLogicalFilterPredicate {
+
+    Or(FilterPredicate left, FilterPredicate right) {
+      super(left, right);
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+  }
+
+  public static class Not implements FilterPredicate, Serializable {
+    private final FilterPredicate predicate;
+    private final String toString;
+
+    Not(FilterPredicate predicate) {
+      this.predicate = checkNotNull(predicate, "predicate");
+      this.toString = "not(" + predicate + ")";
+    }
+
+    public FilterPredicate getPredicate() {
+      return predicate;
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      Not not = (Not) o;
+      return predicate.equals(not.predicate);
+    }
+
+    @Override
+    public int hashCode() {
+      return predicate.hashCode() * 31 + getClass().hashCode();
+    }
+  }
+
+  public static final class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T>> implements FilterPredicate, Serializable {
+    private final Column<T> column;
+    private final Class<U> udpClass;
+    private final String toString;
+    private static final String INSTANTIATION_ERROR_MESSAGE =
+        "Could not instantiate custom filter: %s. User defined predicates must be static classes with a default constructor.";
+
+    UserDefined(Column<T> column, Class<U> udpClass) {
+      this.column = checkNotNull(column, "column");
+      this.udpClass = checkNotNull(udpClass, "udpClass");
+      String name = getClass().getSimpleName().toLowerCase();
+      this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")";
+
+      // defensively try to instantiate the class early to make sure that it's possible
+      getUserDefinedPredicate();
+    }
+
+    public Column<T> getColumn() {
+      return column;
+    }
+
+    public Class<U> getUserDefinedPredicateClass() {
+      return udpClass;
+    }
+
+    public U getUserDefinedPredicate() {
+      try {
+        return udpClass.newInstance();
+      } catch (InstantiationException e) {
+        throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
+      }
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      UserDefined that = (UserDefined) o;
+
+      if (!column.equals(that.column)) return false;
+      if (!udpClass.equals(that.udpClass)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = column.hashCode();
+      result = 31 * result + udpClass.hashCode();
+      result = result * 31 + getClass().hashCode();
+      return result;
+    }
+  }
+
+  // Represents the inverse of a UserDefined. It is equivalent to not(userDefined), without the use
+  // of the not() operator
+  public static final class LogicalNotUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T>> implements FilterPredicate, Serializable {
+    private final UserDefined<T, U> udp;
+    private final String toString;
+
+    LogicalNotUserDefined(UserDefined<T, U> userDefined) {
+      this.udp = checkNotNull(userDefined, "userDefined");
+      this.toString = "inverted(" + udp + ")";
+    }
+
+    public UserDefined<T, U> getUserDefined() {
+      return udp;
+    }
+
+    @Override
+    public <R> R accept(Visitor<R> visitor) {
+      return visitor.visit(this);
+    }
+
+    @Override
+    public String toString() {
+      return toString;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      LogicalNotUserDefined that = (LogicalNotUserDefined) o;
+
+      if (!udp.equals(that.udp)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = udp.hashCode();
+      result = result * 31 + getClass().hashCode();
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java
new file mode 100644
index 0000000..da0e122
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java
@@ -0,0 +1,172 @@
+package parquet.filter2.predicate;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import parquet.column.ColumnDescriptor;
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.Column;
+import parquet.filter2.predicate.Operators.ColumnFilterPredicate;
+import parquet.filter2.predicate.Operators.Eq;
+import parquet.filter2.predicate.Operators.Gt;
+import parquet.filter2.predicate.Operators.GtEq;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.Lt;
+import parquet.filter2.predicate.Operators.LtEq;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.NotEq;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.predicate.Operators.UserDefined;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+
+import static parquet.Preconditions.checkArgument;
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Inspects the column types found in the provided {@link FilterPredicate} and compares them
+ * to the actual schema found in the parquet file. If the provided predicate's types are
+ * not consistent with the file schema, and IllegalArgumentException is thrown.
+ *
+ * Ideally, all this would be checked at compile time, and this class wouldn't be needed.
+ * If we can come up with a way to do that, we should.
+ *
+ * This class is stateful, cannot be reused, and is not thread safe.
+ *
+ * TODO: detect if a column is optional or required and validate that eq(null)
+ * TODO: is not called on required fields (is that too strict?)
+ * TODO: (https://issues.apache.org/jira/browse/PARQUET-44)
+ */
+public class SchemaCompatibilityValidator implements FilterPredicate.Visitor<Void> {
+
+  public static void validate(FilterPredicate predicate, MessageType schema) {
+    checkNotNull(predicate, "predicate");
+    checkNotNull(schema, "schema");
+    predicate.accept(new SchemaCompatibilityValidator(schema));
+  }
+
+  // A map of column name to the type the user supplied for this column.
+  // Used to validate that the user did not provide different types for the same
+  // column.
+  private final Map<ColumnPath, Class<?>> columnTypesEncountered = new HashMap<ColumnPath, Class<?>>();
+
+  // the columns (keyed by path) according to the file's schema. This is the source of truth, and
+  // we are validating that what the user provided agrees with these.
+  private final Map<ColumnPath, ColumnDescriptor> columnsAccordingToSchema = new HashMap<ColumnPath, ColumnDescriptor>();
+
+  // the original type of a column, keyed by path
+  private final Map<ColumnPath, OriginalType> originalTypes = new HashMap<ColumnPath, OriginalType>();
+
+  private SchemaCompatibilityValidator(MessageType schema) {
+
+    for (ColumnDescriptor cd : schema.getColumns()) {
+      ColumnPath columnPath = ColumnPath.get(cd.getPath());
+      columnsAccordingToSchema.put(columnPath, cd);
+
+      OriginalType ot = schema.getType(cd.getPath()).getOriginalType();
+      if (ot != null) {
+        originalTypes.put(columnPath, ot);
+      }
+    }
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(Eq<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(NotEq<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(Lt<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(LtEq<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(Gt<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Void visit(GtEq<T> pred) {
+    validateColumnFilterPredicate(pred);
+    return null;
+  }
+
+  @Override
+  public Void visit(And and) {
+    and.getLeft().accept(this);
+    and.getRight().accept(this);
+    return null;
+  }
+
+  @Override
+  public Void visit(Or or) {
+    or.getLeft().accept(this);
+    or.getRight().accept(this);
+    return null;
+  }
+
+  @Override
+  public Void visit(Not not) {
+    not.getPredicate().accept(this);
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Void visit(UserDefined<T, U> udp) {
+    validateColumn(udp.getColumn());
+    return null;
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Void visit(LogicalNotUserDefined<T, U> udp) {
+    return udp.getUserDefined().accept(this);
+  }
+
+  private <T extends Comparable<T>> void validateColumnFilterPredicate(ColumnFilterPredicate<T> pred) {
+    validateColumn(pred.getColumn());
+  }
+
+  private <T extends Comparable<T>> void validateColumn(Column<T> column) {
+    ColumnPath path = column.getColumnPath();
+
+    Class<?> alreadySeen = columnTypesEncountered.get(path);
+    if (alreadySeen != null && !alreadySeen.equals(column.getColumnType())) {
+      throw new IllegalArgumentException("Column: "
+          + path.toDotString()
+          + " was provided with different types in the same predicate."
+          + " Found both: (" + alreadySeen + ", " + column.getColumnType() + ")");
+    }
+    columnTypesEncountered.put(path, column.getColumnType());
+
+    ColumnDescriptor descriptor = getColumnDescriptor(path);
+
+    if (descriptor.getMaxRepetitionLevel() > 0) {
+      throw new IllegalArgumentException("FilterPredicates do not currently support repeated columns. "
+          + "Column " + path.toDotString() + " is repeated.");
+    }
+
+    ValidTypeMap.assertTypeValid(column, descriptor.getType(), originalTypes.get(path));
+  }
+
+  private ColumnDescriptor getColumnDescriptor(ColumnPath columnPath) {
+    ColumnDescriptor cd = columnsAccordingToSchema.get(columnPath);
+    checkArgument(cd != null, "Column " + columnPath + " was not found in schema!");
+    return cd;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/predicate/Statistics.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/Statistics.java b/parquet-column/src/main/java/parquet/filter2/predicate/Statistics.java
new file mode 100644
index 0000000..408bc54
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/predicate/Statistics.java
@@ -0,0 +1,24 @@
+package parquet.filter2.predicate;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Contains statistics about a group of records
+ */
+public class Statistics<T> {
+  private final T min;
+  private final T max;
+
+  public Statistics(T min, T max) {
+    this.min = checkNotNull(min, "min");
+    this.max = checkNotNull(max, "max");
+  }
+
+  public T getMin() {
+    return min;
+  }
+
+  public T getMax() {
+    return max;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java b/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java
new file mode 100644
index 0000000..99f6c76
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java
@@ -0,0 +1,90 @@
+package parquet.filter2.predicate;
+
+/**
+ * A UserDefinedPredicate decides whether a record should be kept or dropped, first by
+ * inspecting meta data about a group of records to see if the entire group can be dropped,
+ * then by inspecting actual values of a single column. These predicates can be combined into
+ * a complex boolean expression via the {@link FilterApi}.
+ *
+ * @param <T> The type of the column this predicate is applied to.
+ */
+// TODO: consider avoiding autoboxing and adding the specialized methods for each type
+// TODO: downside is that's fairly unwieldy for users
+public abstract class UserDefinedPredicate<T extends Comparable<T>> {
+
+  /**
+   * A udp must have a default constructor.
+   * The udp passed to {@link FilterApi} will not be serialized along with its state.
+   * Only its class name will be recorded, it will be instantiated reflectively via the default
+   * constructor.
+   */
+  public UserDefinedPredicate() { }
+
+  /**
+   * Return true to keep the record with this value, false to drop it.
+   */
+  public abstract boolean keep(T value);
+
+  /**
+   * Given information about a group of records (eg, the min and max value)
+   * Return true to drop all the records in this group, false to keep them for further
+   * inspection. Returning false here will cause the records to be loaded and each value
+   * will be passed to {@link #keep} to make the final decision.
+   *
+   * It is safe to always return false here, if you simply want to visit each record via the {@link #keep} method,
+   * though it is much more efficient to drop entire chunks of records here if you can.
+   */
+  public abstract boolean canDrop(Statistics<T> statistics);
+
+  /**
+   * Same as {@link #canDrop} except this method describes the logical inverse
+   * behavior of this predicate. If this predicate is passed to the not() operator, then
+   * {@link #inverseCanDrop} will be called instead of {@link #canDrop}
+   *
+   * It is safe to always return false here, if you simply want to visit each record via the {@link #keep} method,
+   * though it is much more efficient to drop entire chunks of records here if you can.
+   *
+   * It may be valid to simply return !canDrop(statistics) but that is not always the case.
+   * To illustrate, look at this re-implementation of a UDP that checks for values greater than 7:
+   *
+   * {@code 
+   * 
+   * // This is just an example, you should use the built in {@link FilterApi#gt} operator instead of
+   * // implementing your own like this.
+   *  
+   * public class IntGreaterThan7UDP extends UserDefinedPredicate<Integer> {
+   *   @Override
+   *   public boolean keep(Integer value) {
+   *     // here we just check if the value is greater than 7.
+   *     // here, parquet knows that if the predicate not(columnX, IntGreaterThan7UDP) is being evaluated,
+   *     // it is safe to simply use !IntEquals7UDP.keep(value)
+   *     return value > 7;
+   *   }
+   * 
+   *   @Override
+   *   public boolean canDrop(Statistics<Integer> statistics) {
+   *     // here we drop a group of records if they are all less than or equal to 7,
+   *     // (there can't possibly be any values greater than 7 in this group of records)
+   *     return statistics.getMax() <= 7;
+   *   }
+   * 
+   *   @Override
+   *   public boolean inverseCanDrop(Statistics<Integer> statistics) {
+   *     // here the predicate not(columnX, IntGreaterThan7UDP) is being evaluated, which means we want
+   *     // to keep all records whose value is is not greater than 7, or, rephrased, whose value is less than or equal to 7.
+   *     // notice what would happen if parquet just tried to evaluate !IntGreaterThan7UDP.canDrop():
+   *     // !IntGreaterThan7UDP.canDrop(stats) == !(stats.getMax() <= 7) == (stats.getMax() > 7)
+   *     // it would drop the following group of records: [100, 1, 2, 3], even though this group of records contains values
+   *     // less than than or equal to 7.
+   * 
+   *     // what we actually want to do is drop groups of records where the *min* is greater than 7, (not the max)
+   *     // for example: the group of records: [100, 8, 9, 10] has a min of 8, so there's no way there are going
+   *     // to be records with a value
+   *     // less than or equal to 7 in this group.
+   *     return statistics.getMin() > 7;
+   *   }
+   * }
+   * }
+   */
+  public abstract boolean inverseCanDrop(Statistics<T> statistics);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/predicate/ValidTypeMap.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/predicate/ValidTypeMap.java b/parquet-column/src/main/java/parquet/filter2/predicate/ValidTypeMap.java
new file mode 100644
index 0000000..6d216c3
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/predicate/ValidTypeMap.java
@@ -0,0 +1,160 @@
+package parquet.filter2.predicate;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.Operators.Column;
+import parquet.io.api.Binary;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+/**
+ * Contains all valid mappings from class -> parquet type (and vice versa) for use in
+ * {@link FilterPredicate}s
+ *
+ * This is a bit ugly, but it allows us to provide good error messages at runtime
+ * when there are type mismatches.
+ *
+ * TODO: this has some overlap with {@link PrimitiveTypeName#javaType}
+ * TODO: (https://issues.apache.org/jira/browse/PARQUET-30)
+ */
+public class ValidTypeMap {
+  private ValidTypeMap() { }
+
+  // classToParquetType and parquetTypeToClass are used as a bi-directional map
+  private static final Map<Class<?>, Set<FullTypeDescriptor>> classToParquetType = new HashMap<Class<?>, Set<FullTypeDescriptor>>();
+  private static final Map<FullTypeDescriptor, Set<Class<?>>> parquetTypeToClass = new HashMap<FullTypeDescriptor, Set<Class<?>>>();
+
+  // set up the mapping in both directions
+  private static void add(Class<?> c, FullTypeDescriptor f) {
+    Set<FullTypeDescriptor> descriptors = classToParquetType.get(c);
+    if (descriptors == null) {
+      descriptors = new HashSet<FullTypeDescriptor>();
+      classToParquetType.put(c, descriptors);
+    }
+    descriptors.add(f);
+
+    Set<Class<?>> classes = parquetTypeToClass.get(f);
+    if (classes == null) {
+      classes = new HashSet<Class<?>>();
+      parquetTypeToClass.put(f, classes);
+    }
+    classes.add(c);
+  }
+
+  static {
+    // basic primitive columns
+    add(Integer.class, new FullTypeDescriptor(PrimitiveTypeName.INT32, null));
+    add(Long.class, new FullTypeDescriptor(PrimitiveTypeName.INT64, null));
+    add(Float.class, new FullTypeDescriptor(PrimitiveTypeName.FLOAT, null));
+    add(Double.class, new FullTypeDescriptor(PrimitiveTypeName.DOUBLE, null));
+    add(Boolean.class, new FullTypeDescriptor(PrimitiveTypeName.BOOLEAN, null));
+
+    // Both of these binary types are valid
+    add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, null));
+    add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null));
+
+    add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, OriginalType.UTF8));
+    add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8));
+  }
+
+  /**
+   * Asserts that foundColumn was declared as a type that is compatible with the type for this column found
+   * in the schema of the parquet file.
+   *
+   * @throws java.lang.IllegalArgumentException if the types do not align
+   *
+   * @param foundColumn the column as declared by the user
+   * @param primitiveType the primitive type according to the schema
+   * @param originalType the original type according to the schema
+   */
+  public static <T extends Comparable<T>> void assertTypeValid(Column<T> foundColumn, PrimitiveTypeName primitiveType, OriginalType originalType) {
+    Class<T> foundColumnType = foundColumn.getColumnType();
+    ColumnPath columnPath = foundColumn.getColumnPath();
+
+    Set<FullTypeDescriptor> validTypeDescriptors = classToParquetType.get(foundColumnType);
+    FullTypeDescriptor typeInFileMetaData = new FullTypeDescriptor(primitiveType, originalType);
+
+    if (validTypeDescriptors == null) {
+      StringBuilder message = new StringBuilder();
+      message
+          .append("Column ")
+          .append(columnPath.toDotString())
+          .append(" was declared as type: ")
+          .append(foundColumnType.getName())
+          .append(" which is not supported in FilterPredicates.");
+
+      Set<Class<?>> supportedTypes = parquetTypeToClass.get(typeInFileMetaData);
+      if (supportedTypes != null) {
+        message
+          .append(" Supported types for this column are: ")
+          .append(supportedTypes);
+      } else {
+        message.append(" There are no supported types for columns of " + typeInFileMetaData);
+      }
+      throw new IllegalArgumentException(message.toString());
+    }
+
+    if (!validTypeDescriptors.contains(typeInFileMetaData)) {
+      StringBuilder message = new StringBuilder();
+      message
+          .append("FilterPredicate column: ")
+          .append(columnPath.toDotString())
+          .append("'s declared type (")
+          .append(foundColumnType.getName())
+          .append(") does not match the schema found in file metadata. Column ")
+          .append(columnPath.toDotString())
+          .append(" is of type: ")
+          .append(typeInFileMetaData)
+          .append("\nValid types for this column are: ")
+          .append(parquetTypeToClass.get(typeInFileMetaData));
+      throw new IllegalArgumentException(message.toString());
+    }
+  }
+
+  private static final class FullTypeDescriptor {
+    private final PrimitiveTypeName primitiveType;
+    private final OriginalType originalType;
+
+    private FullTypeDescriptor(PrimitiveTypeName primitiveType, OriginalType originalType) {
+      this.primitiveType = primitiveType;
+      this.originalType = originalType;
+    }
+
+    public PrimitiveTypeName getPrimitiveType() {
+      return primitiveType;
+    }
+
+    public OriginalType getOriginalType() {
+      return originalType;
+    }
+
+    @Override
+    public String toString() {
+      return "FullTypeDescriptor(" + "PrimitiveType: " + primitiveType + ", OriginalType: " + originalType + ')';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      FullTypeDescriptor that = (FullTypeDescriptor) o;
+
+      if (originalType != that.originalType) return false;
+      if (primitiveType != that.primitiveType) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = primitiveType != null ? primitiveType.hashCode() : 0;
+      result = 31 * result + (originalType != null ? originalType.hashCode() : 0);
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringGroupConverter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringGroupConverter.java b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringGroupConverter.java
new file mode 100644
index 0000000..4720854
--- /dev/null
+++ b/parquet-column/src/main/java/parquet/filter2/recordlevel/FilteringGroupConverter.java
@@ -0,0 +1,97 @@
+package parquet.filter2.recordlevel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;
+import parquet.io.PrimitiveColumnIO;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+
+import static parquet.Preconditions.checkArgument;
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * See {@link FilteringRecordMaterializer}
+ */
+public class FilteringGroupConverter extends GroupConverter {
+  // the real converter
+  private final GroupConverter delegate;
+
+  // the path, from the root of the schema, to this converter
+  // used ultimately by the primitive converter proxy to figure
+  // out which column it represents.
+  private final List<Integer> indexFieldPath;
+
+  // for a given column, which nodes in the filter expression need
+  // to be notified of this column's value
+  private final Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn;
+
+  // used to go from our indexFieldPath to the PrimitiveColumnIO for that column
+  private final Map<List<Integer>, PrimitiveColumnIO> columnIOsByIndexFieldPath;
+
+  public FilteringGroupConverter(
+      GroupConverter delegate,
+      List<Integer> indexFieldPath,
+      Map<ColumnPath, List<ValueInspector>> valueInspectorsByColumn, Map<List<Integer>,
+      PrimitiveColumnIO> columnIOsByIndexFieldPath) {
+
+    this.delegate = checkNotNull(delegate, "delegate");
+    this.indexFieldPath = checkNotNull(indexFieldPath, "indexFieldPath");
+    this.columnIOsByIndexFieldPath = checkNotNull(columnIOsByIndexFieldPath, "columnIOsByIndexFieldPath");
+    this.valueInspectorsByColumn = checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn");
+  }
+
+  // When a converter is asked for, we get the real one from the delegate, then wrap it
+  // in a filtering pass-through proxy.
+  // TODO: making the assumption that getConverter(i) is only called once, is that valid?
+  @Override
+  public Converter getConverter(int fieldIndex) {
+
+    // get the real converter from the delegate
+    Converter delegateConverter = checkNotNull(delegate.getConverter(fieldIndex), "delegate converter");
+
+    // determine the indexFieldPath for the converter proxy we're about to make, which is
+    // this converter's path + the requested fieldIndex
+    List<Integer> newIndexFieldPath = new ArrayList<Integer>(indexFieldPath.size() + 1);
+    newIndexFieldPath.addAll(indexFieldPath);
+    newIndexFieldPath.add(fieldIndex);
+
+    if (delegateConverter.isPrimitive()) {
+      PrimitiveColumnIO columnIO = getColumnIO(newIndexFieldPath);
+      ColumnPath columnPath = ColumnPath.get(columnIO.getColumnDescriptor().getPath());
+      ValueInspector[] valueInspectors = getValueInspectors(columnPath);
+      return new FilteringPrimitiveConverter(delegateConverter.asPrimitiveConverter(), valueInspectors);
+    } else {
+      return new FilteringGroupConverter(delegateConverter.asGroupConverter(), newIndexFieldPath, valueInspectorsByColumn, columnIOsByIndexFieldPath);
+    }
+
+  }
+
+  private PrimitiveColumnIO getColumnIO(List<Integer> indexFieldPath) {
+    PrimitiveColumnIO found = columnIOsByIndexFieldPath.get(indexFieldPath);
+    checkArgument(found != null, "Did not find PrimitiveColumnIO for index field path" + indexFieldPath);
+    return found;
+  }
+
+  private ValueInspector[] getValueInspectors(ColumnPath columnPath) {
+    List<ValueInspector> inspectorsList = valueInspectorsByColumn.get(columnPath);
+    if (inspectorsList == null) {
+      return new ValueInspector[] {};
+    } else {
+      return inspectorsList.toArray(new ValueInspector[inspectorsList.size()]);
+    }
+  }
+
+  @Override
+  public void start() {
+    delegate.start();
+  }
+
+  @Override
+  public void end() {
+    delegate.end();
+  }
+}