You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2014/07/29 23:39:16 UTC

[2/4] Add a unified and optionally more constrained API for expressing filters on columns

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-column/src/test/java/parquet/io/TestFiltered.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/parquet/io/TestFiltered.java b/parquet-column/src/test/java/parquet/io/TestFiltered.java
index 66fe6a0..0107b36 100644
--- a/parquet-column/src/test/java/parquet/io/TestFiltered.java
+++ b/parquet-column/src/test/java/parquet/io/TestFiltered.java
@@ -15,19 +15,6 @@
  */
 package parquet.io;
 
-import static org.junit.Assert.assertEquals;
-import static parquet.example.Paper.r1;
-import static parquet.example.Paper.r2;
-import static parquet.example.Paper.schema;
-import static parquet.filter.AndRecordFilter.and;
-import static parquet.filter.ColumnPredicates.applyFunctionToLong;
-import static parquet.filter.ColumnPredicates.applyFunctionToString;
-import static parquet.filter.ColumnPredicates.equalTo;
-import static parquet.filter.ColumnRecordFilter.column;
-import static parquet.filter.NotRecordFilter.not;
-import static parquet.filter.OrRecordFilter.or;
-import static parquet.filter.PagedRecordFilter.page;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -41,8 +28,22 @@ import parquet.example.data.GroupWriter;
 import parquet.example.data.simple.convert.GroupRecordConverter;
 import parquet.filter.ColumnPredicates.LongPredicateFunction;
 import parquet.filter.ColumnPredicates.PredicateFunction;
+import parquet.filter2.compat.FilterCompat;
 import parquet.io.api.RecordMaterializer;
 
+import static org.junit.Assert.assertEquals;
+import static parquet.example.Paper.r1;
+import static parquet.example.Paper.r2;
+import static parquet.example.Paper.schema;
+import static parquet.filter.AndRecordFilter.and;
+import static parquet.filter.ColumnPredicates.applyFunctionToLong;
+import static parquet.filter.ColumnPredicates.applyFunctionToString;
+import static parquet.filter.ColumnPredicates.equalTo;
+import static parquet.filter.ColumnRecordFilter.column;
+import static parquet.filter.NotRecordFilter.not;
+import static parquet.filter.OrRecordFilter.or;
+import static parquet.filter.PagedRecordFilter.page;
+
 public class TestFiltered {
 
   /* Class that implements applyFunction filter for long. Checks for long greater than 15. */
@@ -84,15 +85,14 @@ public class TestFiltered {
     // Get first record
     RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
     RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
-        columnIO.getRecordReader(memPageStore, recordConverter,
-            column("DocId", equalTo(10l)));
+        columnIO.getRecordReader(memPageStore, recordConverter, FilterCompat.get(column("DocId", equalTo(10l))));
 
     readOne(recordReader, "r2 filtered out", r1);
 
     // Get second record
     recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            column("DocId", equalTo(20l)));
+            FilterCompat.get(column("DocId", equalTo(20l))));
 
     readOne(recordReader, "r1 filtered out", r2);
 
@@ -107,14 +107,14 @@ public class TestFiltered {
     RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
     RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            column("DocId", equalTo(10l)));
+            FilterCompat.get(column("DocId", equalTo(10l))));
 
     readOne(recordReader, "r2 filtered out", r1);
 
     // Get second record
     recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            column("DocId", applyFunctionToLong (new LongGreaterThan15Predicate())));
+            FilterCompat.get(column("DocId", applyFunctionToLong(new LongGreaterThan15Predicate()))));
 
     readOne(recordReader, "r1 filtered out", r2);
   }
@@ -128,7 +128,7 @@ public class TestFiltered {
     RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
     RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            column("Name.Url", equalTo("http://A")));
+            FilterCompat.get(column("Name.Url", equalTo("http://A"))));
 
     readOne(recordReader, "r2 filtered out", r1);
 
@@ -136,7 +136,7 @@ public class TestFiltered {
     // against the first instance of a
     recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            column("Name.Url", equalTo("http://B")));
+            FilterCompat.get(column("Name.Url", equalTo("http://B"))));
 
     List<Group> all = readAll(recordReader);
     assertEquals("There should be no matching records: " + all , 0, all.size());
@@ -144,7 +144,7 @@ public class TestFiltered {
     // Finally try matching against the C url in record 2
     recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            column("Name.Url", equalTo("http://C")));
+            FilterCompat.get(column("Name.Url", equalTo("http://C"))));
 
     readOne(recordReader, "r1 filtered out", r2);
 
@@ -159,7 +159,7 @@ public class TestFiltered {
     RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
     RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            column("Name.Url", applyFunctionToString (new StringEndsWithAPredicate ())));
+            FilterCompat.get(column("Name.Url", applyFunctionToString(new StringEndsWithAPredicate()))));
 
     readOne(recordReader, "r2 filtered out", r1);
 
@@ -167,7 +167,7 @@ public class TestFiltered {
     // against the first instance of a
     recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            column("Name.Url", equalTo("http://B")));
+            FilterCompat.get(column("Name.Url", equalTo("http://B"))));
 
     List<Group> all = readAll(recordReader);
     assertEquals("There should be no matching records: " + all , 0, all.size());
@@ -175,7 +175,7 @@ public class TestFiltered {
     // Finally try matching against the C url in record 2
     recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            column("Name.Url", equalTo("http://C")));
+            FilterCompat.get(column("Name.Url", equalTo("http://C"))));
 
     readOne(recordReader, "r1 filtered out", r2);
 
@@ -189,7 +189,7 @@ public class TestFiltered {
     RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
     RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-                                 page(4, 4));
+            FilterCompat.get(page(4, 4)));
 
     List<Group> all = readAll(recordReader);
     assertEquals("expecting records " + all, 4, all.size());
@@ -206,7 +206,7 @@ public class TestFiltered {
     RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
     RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            and(column("DocId", equalTo(10l)), page(2, 4)));
+            FilterCompat.get(and(column("DocId", equalTo(10l)), page(2, 4))));
 
     List<Group> all = readAll(recordReader);
     assertEquals("expecting 4 records " + all, 4, all.size());
@@ -224,8 +224,8 @@ public class TestFiltered {
     RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
     RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            or(column("DocId", equalTo(10l)),
-                column("DocId", equalTo(20l))));
+            FilterCompat.get(or(column("DocId", equalTo(10l)),
+                column("DocId", equalTo(20l)))));
 
     List<Group> all = readAll(recordReader);
     assertEquals("expecting 8 records " + all, 16, all.size());
@@ -243,7 +243,7 @@ public class TestFiltered {
     RecordMaterializer<Group> recordConverter = new GroupRecordConverter(schema);
     RecordReaderImplementation<Group> recordReader = (RecordReaderImplementation<Group>)
         columnIO.getRecordReader(memPageStore, recordConverter,
-            not(column("DocId", equalTo(10l))));
+            FilterCompat.get(not(column("DocId", equalTo(10l)))));
 
     List<Group> all = readAll(recordReader);
     assertEquals("expecting 8 records " + all, 8, all.size());

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-common/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml
index 9abc8af..02abcad 100644
--- a/parquet-common/pom.xml
+++ b/parquet-common/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-common/src/main/java/parquet/Closeables.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/parquet/Closeables.java b/parquet-common/src/main/java/parquet/Closeables.java
new file mode 100644
index 0000000..9d4c213
--- /dev/null
+++ b/parquet-common/src/main/java/parquet/Closeables.java
@@ -0,0 +1,37 @@
+package parquet;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Utility for working with {@link java.io.Closeable}ss
+ */
+public final class Closeables {
+  private Closeables() { }
+
+  private static final Log LOG = Log.getLog(Closeables.class);
+
+  /**
+   * Closes a (potentially null) closeable.
+   * @param c can be null
+   * @throws IOException if c.close() throws an IOException.
+   */
+  public static void close(Closeable c) throws IOException {
+    if (c == null) { return; }
+    c.close();
+  }
+
+  /**
+   * Closes a (potentially null) closeable, swallowing any IOExceptions thrown by
+   * c.close(). The exception will be logged.
+   * @param c can be null
+   */
+  public static void closeAndSwallowIOExceptions(Closeable c) {
+    if (c == null) { return; }
+    try {
+      c.close();
+    } catch (IOException e) {
+      LOG.warn("Encountered exception closing closeable", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-common/src/main/java/parquet/common/internal/Canonicalizer.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/parquet/common/internal/Canonicalizer.java b/parquet-common/src/main/java/parquet/common/internal/Canonicalizer.java
new file mode 100644
index 0000000..3cea532
--- /dev/null
+++ b/parquet-common/src/main/java/parquet/common/internal/Canonicalizer.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2014 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.common.internal;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * returns canonical representation of objects (similar to String.intern()) to save memory
+ * if a.equals(b) then canonicalize(a) == canonicalize(b)
+ * this class is thread safe
+ * @author Julien Le Dem
+ *
+ * @param <T>
+ */
+public class Canonicalizer<T> {
+
+  private ConcurrentHashMap<T, T> canonicals = new ConcurrentHashMap<T, T>();
+
+  /**
+   * @param value the value to canonicalize
+   * @return the corresponding canonical value
+   */
+  final public T canonicalize(T value) {
+    T canonical = canonicals.get(value);
+    if (canonical == null) {
+      value = toCanonical(value);
+      T existing = canonicals.putIfAbsent(value, value);
+      // putIfAbsent is atomic, making sure we always return the same canonical representation of the value
+      if (existing == null) {
+        canonical = value;
+      } else {
+        canonical = existing;
+      }
+    }
+    return canonical;
+  }
+
+  /**
+   * @param value the value to canonicalize if needed
+   * @return the canonicalized value
+   */
+  protected T toCanonical(T value) {
+    return value;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-common/src/main/java/parquet/common/schema/ColumnPath.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/parquet/common/schema/ColumnPath.java b/parquet-common/src/main/java/parquet/common/schema/ColumnPath.java
new file mode 100644
index 0000000..f3ded9c
--- /dev/null
+++ b/parquet-common/src/main/java/parquet/common/schema/ColumnPath.java
@@ -0,0 +1,96 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.common.schema;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import parquet.common.internal.Canonicalizer;
+
+import static parquet.Preconditions.checkNotNull;
+
+public final class ColumnPath implements Iterable<String>, Serializable {
+
+  private static Canonicalizer<ColumnPath> paths = new Canonicalizer<ColumnPath>() {
+    @Override
+    protected ColumnPath toCanonical(ColumnPath value) {
+      String[] path = new String[value.p.length];
+      for (int i = 0; i < value.p.length; i++) {
+        path[i] = value.p[i].intern();
+      }
+      return new ColumnPath(path);
+    }
+  };
+
+  public static ColumnPath fromDotString(String path) {
+    checkNotNull(path, "path");
+    return get(path.split("\\."));
+  }
+
+  public static ColumnPath get(String... path){
+    return paths.canonicalize(new ColumnPath(path));
+  }
+
+  private final String[] p;
+
+  private ColumnPath(String[] path) {
+    this.p = path;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof ColumnPath) {
+      return Arrays.equals(p, ((ColumnPath)obj).p);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(p);
+  }
+
+  public String toDotString() {
+    Iterator<String> iter = Arrays.asList(p).iterator();
+    StringBuilder sb = new StringBuilder();
+    while (iter.hasNext()) {
+      sb.append(iter.next());
+      if (iter.hasNext()) {
+        sb.append('.');
+      }
+    }
+    return sb.toString();
+  }
+
+  @Override
+  public String toString() {
+    return Arrays.toString(p);
+  }
+
+  @Override
+  public Iterator<String> iterator() {
+    return Arrays.asList(p).iterator();
+  }
+
+  public int size() {
+    return p.length;
+  }
+
+  public String[] toArray() {
+    return p;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-encoding/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-encoding/pom.xml b/parquet-encoding/pom.xml
index 6da2ea6..5840ca5 100644
--- a/parquet-encoding/pom.xml
+++ b/parquet-encoding/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-generator/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-generator/pom.xml b/parquet-generator/pom.xml
index 7afc41b..1ec4a50 100644
--- a/parquet-generator/pom.xml
+++ b/parquet-generator/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-generator/src/main/java/parquet/encoding/Generator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/parquet/encoding/Generator.java b/parquet-generator/src/main/java/parquet/encoding/Generator.java
index 58d9382..b6aa596 100644
--- a/parquet-generator/src/main/java/parquet/encoding/Generator.java
+++ b/parquet-generator/src/main/java/parquet/encoding/Generator.java
@@ -19,7 +19,7 @@ import parquet.encoding.bitpacking.ByteBasedBitPackingGenerator;
 import parquet.encoding.bitpacking.IntBasedBitPackingGenerator;
 
 /**
- * main class for code generation hook in build
+ * main class for code generation hook in build for encodings generation
  *
  * @author Julien Le Dem
  *

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-generator/src/main/java/parquet/filter2/Generator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/parquet/filter2/Generator.java b/parquet-generator/src/main/java/parquet/filter2/Generator.java
new file mode 100644
index 0000000..9818218
--- /dev/null
+++ b/parquet-generator/src/main/java/parquet/filter2/Generator.java
@@ -0,0 +1,10 @@
+package parquet.filter2;
+
+/**
+ * main class for code generation hook in build for filter2 generation
+ */
+public class Generator {
+  public static void main(String[] args) throws Exception {
+    IncrementallyUpdatedFilterPredicateGenerator.main(args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java b/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
new file mode 100644
index 0000000..e0f08e4
--- /dev/null
+++ b/parquet-generator/src/main/java/parquet/filter2/IncrementallyUpdatedFilterPredicateGenerator.java
@@ -0,0 +1,251 @@
+package parquet.filter2;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+
+public class IncrementallyUpdatedFilterPredicateGenerator {
+
+  public static void main(String[] args) throws IOException {
+    File srcFile = new File(args[0] + "/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilder.java");
+    srcFile = srcFile.getAbsoluteFile();
+    File parent = srcFile.getParentFile();
+    if (!parent.exists()) {
+      if (!parent.mkdirs()) {
+        throw new IOException("Couldn't mkdirs for " + parent);
+      }
+    }
+    new IncrementallyUpdatedFilterPredicateGenerator(srcFile).run();
+  }
+
+  private final FileWriter writer;
+
+  public IncrementallyUpdatedFilterPredicateGenerator(File file) throws IOException {
+    this.writer = new FileWriter(file);
+  }
+
+  private static class TypeInfo {
+    public final String className;
+    public final String primitiveName;
+    public final boolean useComparable;
+    public final boolean supportsInequality;
+
+    private TypeInfo(String className, String primitiveName, boolean useComparable, boolean supportsInequality) {
+      this.className = className;
+      this.primitiveName = primitiveName;
+      this.useComparable = useComparable;
+      this.supportsInequality = supportsInequality;
+    }
+  }
+
+  private static final TypeInfo[] TYPES = new TypeInfo[]{
+    new TypeInfo("Integer", "int", false, true),
+    new TypeInfo("Long", "long", false, true),
+    new TypeInfo("Boolean", "boolean", false, false),
+    new TypeInfo("Float", "float", false, true),
+    new TypeInfo("Double", "double", false, true),
+    new TypeInfo("Binary", "Binary", true, true),
+  };
+
+  public void run() throws IOException {
+    add("package parquet.filter2.recordlevel;\n" +
+        "\n" +
+        "import parquet.common.schema.ColumnPath;\n" +
+        "import parquet.filter2.predicate.Operators.Eq;\n" +
+        "import parquet.filter2.predicate.Operators.Gt;\n" +
+        "import parquet.filter2.predicate.Operators.GtEq;\n" +
+        "import parquet.filter2.predicate.Operators.LogicalNotUserDefined;\n" +
+        "import parquet.filter2.predicate.Operators.Lt;\n" +
+        "import parquet.filter2.predicate.Operators.LtEq;\n" +
+        "import parquet.filter2.predicate.Operators.NotEq;\n" +
+        "import parquet.filter2.predicate.Operators.UserDefined;\n" +
+        "import parquet.filter2.predicate.UserDefinedPredicate;\n" +
+        "import parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector;\n" +
+        "import parquet.io.api.Binary;\n\n" +
+        "/**\n" +
+        " * This class is auto-generated by {@link parquet.filter2.IncrementallyUpdatedFilterPredicateGenerator}\n" +
+        " * Do not manually edit!\n" +
+        " * See {@link IncrementallyUpdatedFilterPredicateBuilderBase}\n" +
+        " */\n");
+
+    add("public class IncrementallyUpdatedFilterPredicateBuilder extends IncrementallyUpdatedFilterPredicateBuilderBase {\n\n");
+
+    addVisitBegin("Eq");
+    for (TypeInfo info : TYPES) {
+      addEqNotEqCase(info, true);
+    }
+    addVisitEnd();
+
+    addVisitBegin("NotEq");
+    for (TypeInfo info : TYPES) {
+      addEqNotEqCase(info, false);
+    }
+    addVisitEnd();
+
+    addVisitBegin("Lt");
+    for (TypeInfo info : TYPES) {
+      addInequalityCase(info, "<");
+    }
+    addVisitEnd();
+
+    addVisitBegin("LtEq");
+    for (TypeInfo info : TYPES) {
+      addInequalityCase(info, "<=");
+    }
+    addVisitEnd();
+
+    addVisitBegin("Gt");
+    for (TypeInfo info : TYPES) {
+      addInequalityCase(info, ">");
+    }
+    addVisitEnd();
+
+    addVisitBegin("GtEq");
+    for (TypeInfo info : TYPES) {
+      addInequalityCase(info, ">=");
+    }
+    addVisitEnd();
+
+    add("  @Override\n" +
+        "  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> IncrementallyUpdatedFilterPredicate visit(UserDefined<T, U> pred) {\n");
+    addUdpBegin();
+    for (TypeInfo info : TYPES) {
+      addUdpCase(info, false);
+    }
+    addVisitEnd();
+
+    add("  @Override\n" +
+        "  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> IncrementallyUpdatedFilterPredicate visit(LogicalNotUserDefined<T, U> notPred) {\n" +
+        "    UserDefined<T, U> pred = notPred.getUserDefined();\n");
+    addUdpBegin();
+    for (TypeInfo info : TYPES) {
+      addUdpCase(info, true);
+    }
+    addVisitEnd();
+
+    add("}\n");
+    writer.close();
+  }
+
+  private void addVisitBegin(String inVar) throws IOException {
+    add("  @Override\n" +
+        "  public <T extends Comparable<T>> IncrementallyUpdatedFilterPredicate visit(" + inVar + "<T> pred) {\n" +
+        "    ColumnPath columnPath = pred.getColumn().getColumnPath();\n" +
+        "    Class<T> clazz = pred.getColumn().getColumnType();\n" +
+        "\n" +
+        "    ValueInspector valueInspector = null;\n\n");
+  }
+
+  private void addVisitEnd() throws IOException {
+    add("    if (valueInspector == null) {\n" +
+        "      throw new IllegalArgumentException(\"Encountered unknown type \" + clazz);\n" +
+        "    }\n" +
+        "\n" +
+        "    addValueInspector(columnPath, valueInspector);\n" +
+        "    return valueInspector;\n" +
+        "  }\n\n");
+  }
+
+  private void addEqNotEqCase(TypeInfo info, boolean isEq) throws IOException {
+    add("    if (clazz.equals(" + info.className + ".class)) {\n" +
+        "      if (pred.getValue() == null) {\n" +
+        "        valueInspector = new ValueInspector() {\n" +
+        "          @Override\n" +
+        "          public void updateNull() {\n" +
+        "            setResult(" + isEq + ");\n" +
+        "          }\n" +
+        "\n" +
+        "          @Override\n" +
+        "          public void update(" + info.primitiveName + " value) {\n" +
+        "            setResult(" + !isEq + ");\n" +
+        "          }\n" +
+        "        };\n" +
+        "      } else {\n" +
+        "        final " + info.primitiveName + " target = (" + info.className + ") (Object) pred.getValue();\n" +
+        "\n" +
+        "        valueInspector = new ValueInspector() {\n" +
+        "          @Override\n" +
+        "          public void updateNull() {\n" +
+        "            setResult(" + !isEq +");\n" +
+        "          }\n" +
+        "\n" +
+        "          @Override\n" +
+        "          public void update(" + info.primitiveName + " value) {\n");
+
+    if (info.useComparable) {
+      add("            setResult(" + compareEquality("value", "target", isEq) + ");\n");
+    } else {
+      add("            setResult(" + (isEq ? "value == target" : "value != target" )  + ");\n");
+    }
+
+    add("          }\n" +
+        "        };\n" +
+        "      }\n" +
+        "    }\n\n");
+  }
+
+  private void addInequalityCase(TypeInfo info, String op) throws IOException {
+    if (!info.supportsInequality) {
+      add("    if (clazz.equals(" + info.className + ".class)) {\n");
+      add("      throw new IllegalArgumentException(\"Operator " + op + " not supported for " + info.className + "\");\n");
+      add("    }\n\n");
+      return;
+    }
+
+    add("    if (clazz.equals(" + info.className + ".class)) {\n" +
+        "      final " + info.primitiveName + " target = (" + info.className + ") (Object) pred.getValue();\n" +
+        "\n" +
+        "      valueInspector = new ValueInspector() {\n" +
+        "        @Override\n" +
+        "        public void updateNull() {\n" +
+        "          setResult(false);\n" +
+        "        }\n" +
+        "\n" +
+        "        @Override\n" +
+        "        public void update(" + info.primitiveName + " value) {\n");
+
+    if (info.useComparable) {
+      add("          setResult(value.compareTo(target) " + op + " 0);\n");
+    } else {
+      add("          setResult(value " + op + " target);\n");
+    }
+    add("        }\n" +
+        "      };\n" +
+        "    }\n\n");
+  }
+
+  private void addUdpBegin() throws IOException {
+    add("    ColumnPath columnPath = pred.getColumn().getColumnPath();\n" +
+        "    Class<T> clazz = pred.getColumn().getColumnType();\n" +
+        "\n" +
+        "    ValueInspector valueInspector = null;\n" +
+        "\n" +
+        "    final U udp = pred.getUserDefinedPredicate();\n" +
+        "\n");
+  }
+
+  private void addUdpCase(TypeInfo info, boolean invert)throws IOException {
+    add("    if (clazz.equals(" + info.className + ".class)) {\n" +
+        "      valueInspector = new ValueInspector() {\n" +
+        "        @Override\n" +
+        "        public void updateNull() {\n" +
+        "          setResult(" + (invert ? "!" : "") + "udp.keep(null));\n" +
+        "        }\n" +
+        "\n" +
+        "        @SuppressWarnings(\"unchecked\")\n" +
+        "        @Override\n" +
+        "        public void update(" + info.primitiveName + " value) {\n" +
+        "          setResult(" + (invert ? "!" : "") + "udp.keep((T) (Object) value));\n" +
+        "        }\n" +
+        "      };\n" +
+        "    }\n\n");
+  }
+
+  private String compareEquality(String var, String target, boolean eq) {
+    return var + ".compareTo(" + target + ")" + (eq ? " == 0 " : " != 0");
+  }
+
+  private void add(String s) throws IOException {
+    writer.write(s);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hadoop-bundle/pom.xml b/parquet-hadoop-bundle/pom.xml
index c3caa2f..ae89233 100644
--- a/parquet-hadoop-bundle/pom.xml
+++ b/parquet-hadoop-bundle/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index c1b4f07..d074b0c 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -3,7 +3,7 @@
     <groupId>com.twitter</groupId>
     <artifactId>parquet</artifactId>
     <relativePath>../pom.xml</relativePath>
-    <version>1.5.1-SNAPSHOT</version>
+    <version>1.6.0-SNAPSHOT</version>
   </parent>
 
   <modelVersion>4.0.0</modelVersion>
@@ -78,9 +78,11 @@
 
   <build>
     <plugins>
+<!-- turned off temporarily, must be turned back on after 1.6.0 is released.
       <plugin>
         <artifactId>maven-enforcer-plugin</artifactId>
       </plugin>
+-->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-jar-plugin</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
new file mode 100644
index 0000000..4da9821
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/filter2/compat/RowGroupFilter.java
@@ -0,0 +1,63 @@
+package parquet.filter2.compat;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.FilterCompat.NoOpFilter;
+import parquet.filter2.compat.FilterCompat.Visitor;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.SchemaCompatibilityValidator;
+import parquet.filter2.statisticslevel.StatisticsFilter;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.schema.MessageType;
+
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Given a {@link Filter} applies it to a list of BlockMetaData (row groups)
+ * If the Filter is an {@link parquet.filter.UnboundRecordFilter} or the no op filter,
+ * no filtering will be performed.
+ */
+public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
+  private final List<BlockMetaData> blocks;
+  private final MessageType schema;
+
+  public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) {
+    checkNotNull(filter, "filter");
+    return filter.accept(new RowGroupFilter(blocks, schema));
+  }
+
+  private RowGroupFilter(List<BlockMetaData> blocks, MessageType schema) {
+    this.blocks = checkNotNull(blocks, "blocks");
+    this.schema = checkNotNull(schema, "schema");
+  }
+
+  @Override
+  public List<BlockMetaData> visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
+    FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();
+
+    // check that the schema of the filter matches the schema of the file
+    SchemaCompatibilityValidator.validate(filterPredicate, schema);
+
+    List<BlockMetaData> filteredBlocks = new ArrayList<BlockMetaData>();
+
+    for (BlockMetaData block : blocks) {
+      if (!StatisticsFilter.canDrop(filterPredicate, block.getColumns())) {
+        filteredBlocks.add(block);
+      }
+    }
+
+    return filteredBlocks;
+  }
+
+  @Override
+  public List<BlockMetaData> visit(FilterCompat.UnboundRecordFilterCompat unboundRecordFilterCompat) {
+    return blocks;
+  }
+
+  @Override
+  public List<BlockMetaData> visit(NoOpFilter noOpFilter) {
+    return blocks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
new file mode 100644
index 0000000..4daed5a
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/filter2/statisticslevel/StatisticsFilter.java
@@ -0,0 +1,244 @@
+package parquet.filter2.statisticslevel;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import parquet.column.statistics.Statistics;
+import parquet.common.schema.ColumnPath;
+import parquet.filter2.predicate.FilterPredicate;
+import parquet.filter2.predicate.Operators.And;
+import parquet.filter2.predicate.Operators.Column;
+import parquet.filter2.predicate.Operators.Eq;
+import parquet.filter2.predicate.Operators.Gt;
+import parquet.filter2.predicate.Operators.GtEq;
+import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
+import parquet.filter2.predicate.Operators.Lt;
+import parquet.filter2.predicate.Operators.LtEq;
+import parquet.filter2.predicate.Operators.Not;
+import parquet.filter2.predicate.Operators.NotEq;
+import parquet.filter2.predicate.Operators.Or;
+import parquet.filter2.predicate.Operators.UserDefined;
+import parquet.filter2.predicate.UserDefinedPredicate;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import static parquet.Preconditions.checkArgument;
+import static parquet.Preconditions.checkNotNull;
+
+/**
+ * Applies a {@link parquet.filter2.predicate.FilterPredicate} to statistics about a group of
+ * records.
+ *
+ * Note: the supplied predicate must not contain any instances of the not() operator as this is not
+ * supported by this filter.
+ *
+ * the supplied predicate should first be run through {@link parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it
+ * in a form that doesn't make use of the not() operator.
+ *
+ * the supplied predicate should also have already been run through
+ * {@link parquet.filter2.predicate.SchemaCompatibilityValidator}
+ * to make sure it is compatible with the schema of this file.
+ *
+ * Returns true if all the records represented by the statistics in the provided column metadata can be dropped.
+ *         false otherwise (including when it is not known, which is often the case).
+ */
+// TODO: this belongs in the parquet-column project, but some of the classes here need to be moved too
+// TODO: (https://issues.apache.org/jira/browse/PARQUET-38)
+public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
+
+  public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns) {
+    checkNotNull(pred, "pred");
+    checkNotNull(columns, "columns");
+    return pred.accept(new StatisticsFilter(columns));
+  }
+
+  private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
+
+  private StatisticsFilter(List<ColumnChunkMetaData> columnsList) {
+    for (ColumnChunkMetaData chunk : columnsList) {
+      columns.put(chunk.getPath(), chunk);
+    }
+  }
+
+  private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
+    ColumnChunkMetaData c = columns.get(columnPath);
+    checkArgument(c != null, "Column " + columnPath.toDotString() + " not found in schema!");
+    return c;
+  }
+
+  // is this column chunk composed entirely of nulls?
+  private boolean isAllNulls(ColumnChunkMetaData column) {
+    return column.getStatistics().getNumNulls() == column.getValueCount();
+  }
+
+  // are there any nulls in this column chunk?
+  private boolean hasNulls(ColumnChunkMetaData column) {
+    return column.getStatistics().getNumNulls() > 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Eq<T> eq) {
+    Column<T> filterColumn = eq.getColumn();
+    T value = eq.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+    if (value == null) {
+      // we are looking for records where v eq(null)
+      // so drop if there are no nulls in this chunk
+      return !hasNulls(columnChunk);
+    }
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v eq(someNonNull)
+      // and this is a column of all nulls, so drop it
+      return true;
+    }
+
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    // drop if value < min || value > max
+    return value.compareTo(stats.genericGetMin()) < 0 || value.compareTo(stats.genericGetMax()) > 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
+    Column<T> filterColumn = notEq.getColumn();
+    T value = notEq.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+    if (value == null) {
+      // we are looking for records where v notEq(null)
+      // so, if this is a column of all nulls, we can drop it
+      return isAllNulls(columnChunk);
+    }
+
+    if (hasNulls(columnChunk)) {
+      // we are looking for records where v notEq(someNonNull)
+      // but this chunk contains nulls, we cannot drop it
+      return false;
+    }
+
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    // drop if this is a column where min = max = value
+    return value.compareTo(stats.genericGetMin()) == 0 && value.compareTo(stats.genericGetMax()) == 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Lt<T> lt) {
+    Column<T> filterColumn = lt.getColumn();
+    T value = lt.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v < someValue
+      // this chunk is all nulls, so we can drop it
+      return true;
+    }
+
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    // drop if value <= min
+    return  value.compareTo(stats.genericGetMin()) <= 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(LtEq<T> ltEq) {
+    Column<T> filterColumn = ltEq.getColumn();
+    T value = ltEq.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v <= someValue
+      // this chunk is all nulls, so we can drop it
+      return true;
+    }
+
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    // drop if value < min
+    return value.compareTo(stats.genericGetMin()) < 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Gt<T> gt) {
+    Column<T> filterColumn = gt.getColumn();
+    T value = gt.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v > someValue
+      // this chunk is all nulls, so we can drop it
+      return true;
+    }
+
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    // drop if value >= max
+    return value.compareTo(stats.genericGetMax()) >= 0;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(GtEq<T> gtEq) {
+    Column<T> filterColumn = gtEq.getColumn();
+    T value = gtEq.getValue();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+
+    if (isAllNulls(columnChunk)) {
+      // we are looking for records where v >= someValue
+      // this chunk is all nulls, so we can drop it
+      return true;
+    }
+
+    Statistics<T> stats = columnChunk.getStatistics();
+
+    // drop if value >= max
+    return value.compareTo(stats.genericGetMax()) > 0;
+  }
+
+  @Override
+  public Boolean visit(And and) {
+    return and.getLeft().accept(this) && and.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Or or) {
+    // seems unintuitive to put an && not an || here
+    // but we can only drop a chunk of records if we know that
+    // both the left and right predicates agree that no matter what
+    // we don't need this chunk.
+    return or.getLeft().accept(this) && or.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Not not) {
+    throw new IllegalArgumentException(
+        "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+  }
+
+  private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud, boolean inverted) {
+    Column<T> filterColumn = ud.getColumn();
+    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
+    U udp = ud.getUserDefinedPredicate();
+    Statistics<T> stats = columnChunk.getStatistics();
+    parquet.filter2.predicate.Statistics<T> udpStats =
+        new parquet.filter2.predicate.Statistics<T>(stats.genericGetMin(), stats.genericGetMax());
+
+    if (inverted) {
+      return udp.inverseCanDrop(udpStats);
+    } else {
+      return udp.canDrop(udpStats);
+    }
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud) {
+    return visit(ud, false);
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> lnud) {
+    return visit(lnud.getUserDefined(), true);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
index 81a39b8..5bd6869 100644
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
@@ -15,9 +15,6 @@
  */
 package parquet.format.converter;
 
-import static parquet.format.Util.readFileMetaData;
-import static parquet.format.Util.writePageHeader;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -33,8 +30,9 @@ import java.util.Map.Entry;
 import java.util.Set;
 
 import parquet.Log;
-import parquet.format.ConvertedType;
+import parquet.common.schema.ColumnPath;
 import parquet.format.ColumnChunk;
+import parquet.format.ConvertedType;
 import parquet.format.DataPageHeader;
 import parquet.format.DictionaryPageHeader;
 import parquet.format.Encoding;
@@ -49,11 +47,9 @@ import parquet.format.Statistics;
 import parquet.format.Type;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.io.ParquetDecodingException;
-import parquet.schema.Types;
 import parquet.schema.GroupType;
 import parquet.schema.MessageType;
 import parquet.schema.OriginalType;
@@ -61,6 +57,10 @@ import parquet.schema.PrimitiveType;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type.Repetition;
 import parquet.schema.TypeVisitor;
+import parquet.schema.Types;
+
+import static parquet.format.Util.readFileMetaData;
+import static parquet.format.Util.writePageHeader;
 
 public class ParquetMetadataConverter {
   private static final Log LOG = Log.getLog(ParquetMetadataConverter.class);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
index f3aa81f..5a9b019 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/InternalParquetRecordReader.java
@@ -18,12 +18,16 @@ package parquet.hadoop;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+
 import parquet.Log;
 import parquet.column.ColumnDescriptor;
 import parquet.column.page.PageReadStore;
 import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
 import parquet.hadoop.api.ReadSupport;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.util.counters.BenchmarkCounter;
@@ -37,12 +41,14 @@ import parquet.schema.Type;
 
 import static java.lang.String.format;
 import static parquet.Log.DEBUG;
+import static parquet.Preconditions.checkNotNull;
 import static parquet.hadoop.ParquetInputFormat.STRICT_TYPE_CHECKING;
 
 class InternalParquetRecordReader<T> {
   private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
 
   private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+  private final Filter filter;
 
   private MessageType requestedSchema;
   private MessageType fileSchema;
@@ -57,7 +63,6 @@ class InternalParquetRecordReader<T> {
   private int currentBlock = -1;
   private ParquetFileReader reader;
   private parquet.io.RecordReader<T> recordReader;
-  private UnboundRecordFilter recordFilter;
   private boolean strictTypeChecking;
 
   private long totalTimeSpentReadingBytes;
@@ -70,19 +75,28 @@ class InternalParquetRecordReader<T> {
 
   /**
    * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   * @param filter for filtering individual records
+   */
+  public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
+    this.readSupport = readSupport;
+    this.filter = checkNotNull(filter, "filter");
+  }
+
+  /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
    */
   public InternalParquetRecordReader(ReadSupport<T> readSupport) {
-    this(readSupport, null);
+    this(readSupport, FilterCompat.NOOP);
   }
 
   /**
    * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
    * @param filter Optional filter for only returning matching records.
+   * @deprecated use {@link #InternalParquetRecordReader(ReadSupport, Filter)}
    */
-  public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter
-      filter) {
-    this.readSupport = readSupport;
-    this.recordFilter = filter;
+  @Deprecated
+  public InternalParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+    this(readSupport, FilterCompat.get(filter));
   }
 
   private void checkRead() throws IOException {
@@ -109,7 +123,7 @@ class InternalParquetRecordReader<T> {
       LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
       if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
       MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema, strictTypeChecking);
-      recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
+      recordReader = columnIO.getRecordReader(pages, recordConverter, filter);
       startedAssemblingCurrentBlockAt = System.currentTimeMillis();
       totalCountLoadedSoFar += pages.getRowCount();
       ++ currentBlock;
@@ -169,27 +183,36 @@ class InternalParquetRecordReader<T> {
   }
 
   public boolean nextKeyValue() throws IOException, InterruptedException {
-    if (current < total) {
+    boolean recordFound = false;
+
+    while (!recordFound) {
+      // no more records left
+      if (current >= total) { return false; }
+
       try {
         checkRead();
         currentValue = recordReader.read();
         current ++;
-        while (currentValue == null) { // only happens with FilteredRecordReader at end of block
+        if (recordReader.shouldSkipCurrentRecord()) {
+          // this record is being filtered via the filter2 package
+          if (DEBUG) LOG.debug("skipping record");
+          continue;
+        }
+
+        if (currentValue == null) {
+          // only happens with FilteredRecordReader at end of block
           current = totalCountLoadedSoFar;
-          if (current < total) {
-            checkRead();
-            currentValue = recordReader.read();
-            current ++;
-            continue;
-          }
-          return false;
+          if (DEBUG) LOG.debug("filtered record reader reached end of block");
+          continue;
         }
+
+        recordFound = true;
+
         if (DEBUG) LOG.debug("read value: " + currentValue);
       } catch (RuntimeException e) {
         throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current, currentBlock, file), e);
       }
-      return true;
     }
-    return false;
+    return true;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
index 2a2f054..e660c9f 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
@@ -15,11 +15,6 @@
  */
 package parquet.hadoop;
 
-import static parquet.Log.DEBUG;
-import static parquet.bytes.BytesUtils.readIntLittleEndian;
-import static parquet.hadoop.ParquetFileWriter.MAGIC;
-import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
-
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.IOException;
@@ -52,6 +47,7 @@ import parquet.column.ColumnDescriptor;
 import parquet.column.page.DictionaryPage;
 import parquet.column.page.Page;
 import parquet.column.page.PageReadStore;
+import parquet.common.schema.ColumnPath;
 import parquet.format.PageHeader;
 import parquet.format.Util;
 import parquet.format.converter.ParquetMetadataConverter;
@@ -59,11 +55,15 @@ import parquet.hadoop.CodecFactory.BytesDecompressor;
 import parquet.hadoop.ColumnChunkPageReadStore.ColumnChunkPageReader;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.counters.BenchmarkCounter;
 import parquet.io.ParquetDecodingException;
 
+import static parquet.Log.DEBUG;
+import static parquet.bytes.BytesUtils.readIntLittleEndian;
+import static parquet.hadoop.ParquetFileWriter.MAGIC;
+import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
+
 /**
  * Internal implementation of the Parquet file reader as a block container
  *

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
index ff29179..f3ef61b 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
@@ -15,9 +15,6 @@
  */
 package parquet.hadoop;
 
-import static parquet.Log.DEBUG;
-import static parquet.format.Util.writeFileMetaData;
-
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -40,10 +37,10 @@ import parquet.bytes.BytesUtils;
 import parquet.column.ColumnDescriptor;
 import parquet.column.page.DictionaryPage;
 import parquet.column.statistics.Statistics;
+import parquet.common.schema.ColumnPath;
 import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.metadata.FileMetaData;
 import parquet.hadoop.metadata.GlobalMetaData;
@@ -52,6 +49,9 @@ import parquet.io.ParquetEncodingException;
 import parquet.schema.MessageType;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
+import static parquet.Log.DEBUG;
+import static parquet.format.Util.writeFileMetaData;
+
 /**
  * Internal implementation of the Parquet file writer as a block container
  *

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index 882d2f7..0231ccd 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -43,6 +43,10 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
 import parquet.Log;
 import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.RowGroupFilter;
+import parquet.filter2.predicate.FilterPredicate;
 import parquet.hadoop.api.InitContext;
 import parquet.hadoop.api.ReadSupport;
 import parquet.hadoop.api.ReadSupport.ReadContext;
@@ -53,10 +57,13 @@ import parquet.hadoop.metadata.GlobalMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.ConfigurationUtil;
 import parquet.hadoop.util.ContextUtil;
+import parquet.hadoop.util.SerializationUtil;
 import parquet.io.ParquetDecodingException;
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
+import static parquet.Preconditions.checkArgument;
+
 /**
  * The input format to read a Parquet file.
  *
@@ -88,6 +95,11 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
    */
   public static final String STRICT_TYPE_CHECKING = "parquet.strict.typing";
 
+  /**
+   * key to configure the filter predicate
+   */
+  public static final String FILTER_PREDICATE = "parquet.private.read.filter.predicate";
+
   private static final int MIN_FOOTER_CACHE_SIZE = 100;
 
   private LruCache<FileStatusWrapper, FootersCacheValue> footersCache;
@@ -99,13 +111,40 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   }
 
   public static void setUnboundRecordFilter(Job job, Class<? extends UnboundRecordFilter> filterClass) {
-    ContextUtil.getConfiguration(job).set(UNBOUND_RECORD_FILTER, filterClass.getName());
+    Configuration conf = ContextUtil.getConfiguration(job);
+    checkArgument(getFilterPredicate(conf) == null,
+        "You cannot provide an UnboundRecordFilter after providing a FilterPredicate");
+
+    conf.set(UNBOUND_RECORD_FILTER, filterClass.getName());
   }
 
+  /**
+   * @deprecated use {@link #getFilter(Configuration)}
+   */
+  @Deprecated
   public static Class<?> getUnboundRecordFilter(Configuration configuration) {
     return ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
   }
 
+  private static UnboundRecordFilter getUnboundRecordFilterInstance(Configuration configuration) {
+    Class<?> clazz = ConfigurationUtil.getClassFromConfig(configuration, UNBOUND_RECORD_FILTER, UnboundRecordFilter.class);
+    if (clazz == null) { return null; }
+
+    try {
+      UnboundRecordFilter unboundRecordFilter = (UnboundRecordFilter) clazz.newInstance();
+
+      if (unboundRecordFilter instanceof Configurable) {
+        ((Configurable)unboundRecordFilter).setConf(configuration);
+      }
+
+      return unboundRecordFilter;
+    } catch (InstantiationException e) {
+      throw new BadConfigurationException("could not instantiate unbound record filter class", e);
+    } catch (IllegalAccessException e) {
+      throw new BadConfigurationException("could not instantiate unbound record filter class", e);
+    }
+  }
+
   public static void setReadSupportClass(JobConf conf, Class<?> readSupportClass) {
     conf.set(READ_SUPPORT_CLASS, readSupportClass.getName());
   }
@@ -114,6 +153,34 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
     return ConfigurationUtil.getClassFromConfig(configuration, READ_SUPPORT_CLASS, ReadSupport.class);
   }
 
+  public static void setFilterPredicate(Configuration configuration, FilterPredicate filterPredicate) {
+    checkArgument(getUnboundRecordFilter(configuration) == null,
+        "You cannot provide a FilterPredicate after providing an UnboundRecordFilter");
+
+    configuration.set(FILTER_PREDICATE + ".human.readable", filterPredicate.toString());
+    try {
+      SerializationUtil.writeObjectToConfAsBase64(FILTER_PREDICATE, filterPredicate, configuration);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static FilterPredicate getFilterPredicate(Configuration configuration) {
+    try {
+      return SerializationUtil.readObjectFromConfAsBase64(FILTER_PREDICATE, configuration);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Returns a non-null Filter, which is a wrapper around either a
+   * FilterPredicate, an UnboundRecordFilter, or a no-op filter.
+   */
+  public static Filter getFilter(Configuration conf) {
+    return FilterCompat.get(getFilterPredicate(conf), getUnboundRecordFilterInstance(conf));
+  }
+
   /**
    * Hadoop will instantiate using this constructor
    */
@@ -135,24 +202,12 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
   public RecordReader<Void, T> createRecordReader(
       InputSplit inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+
+    ReadSupport<T> readSupport = getReadSupport(ContextUtil.getConfiguration(taskAttemptContext));
+
     Configuration conf = ContextUtil.getConfiguration(taskAttemptContext);
-    ReadSupport<T> readSupport = getReadSupport(conf);
-    Class<?> unboundRecordFilterClass = getUnboundRecordFilter(conf);
-    if (unboundRecordFilterClass == null) {
-      return new ParquetRecordReader<T>(readSupport);
-    } else {
-      try {
-        UnboundRecordFilter filter = (UnboundRecordFilter)unboundRecordFilterClass.newInstance();
-        if (filter instanceof Configurable) {
-          ((Configurable)filter).setConf(conf);
-        }
-        return new ParquetRecordReader<T>(readSupport, filter);
-      } catch (InstantiationException e) {
-        throw new BadConfigurationException("could not instantiate unbound record filter class", e);
-      } catch (IllegalAccessException e) {
-        throw new BadConfigurationException("could not instantiate unbound record filter class", e);
-      }
-    }
+
+    return new ParquetRecordReader<T>(readSupport, getFilter(conf));
   }
 
   /**
@@ -381,6 +436,12 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
         configuration,
         globalMetaData.getKeyValueMetaData(),
         globalMetaData.getSchema()));
+
+    Filter filter = getFilter(configuration);
+
+    long rowGroupsDropped = 0;
+    long totalRowGroups = 0;
+
     for (Footer footer : footers) {
       final Path file = footer.getFile();
       LOG.debug(file);
@@ -388,10 +449,21 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
       FileStatus fileStatus = fs.getFileStatus(file);
       ParquetMetadata parquetMetaData = footer.getParquetMetadata();
       List<BlockMetaData> blocks = parquetMetaData.getBlocks();
+
+      List<BlockMetaData> filteredBlocks = blocks;
+
+      totalRowGroups += blocks.size();
+      filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
+      rowGroupsDropped += blocks.size() - filteredBlocks.size();
+
+      if (filteredBlocks.isEmpty()) {
+        continue;
+      }
+
       BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
       splits.addAll(
           generateSplits(
-              blocks,
+              filteredBlocks,
               fileBlockLocations,
               fileStatus,
               parquetMetaData.getFileMetaData(),
@@ -401,6 +473,14 @@ public class ParquetInputFormat<T> extends FileInputFormat<Void, T> {
               maxSplitSize)
           );
     }
+
+    if (rowGroupsDropped > 0 && totalRowGroups > 0) {
+      int percentDropped = (int) ((((double) rowGroupsDropped) / totalRowGroups) * 100);
+      LOG.info("Dropping " + rowGroupsDropped + " row groups that do not pass filter predicate! (" + percentDropped + "%)");
+    } else {
+      LOG.info("There were no row groups that could be dropped due to filter predicates");
+    }
+
     return splits;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
index 9e9b4ff..da0c2ec 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputSplit.java
@@ -40,9 +40,9 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import parquet.Log;
 import parquet.column.Encoding;
 import parquet.column.statistics.IntStatistics;
+import parquet.common.schema.ColumnPath;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ColumnPath;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
index 3e85331..c56a402 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetReader.java
@@ -17,7 +17,6 @@ package parquet.hadoop;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -30,6 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
+import parquet.filter2.compat.RowGroupFilter;
 import parquet.hadoop.api.InitContext;
 import parquet.hadoop.api.ReadSupport;
 import parquet.hadoop.api.ReadSupport.ReadContext;
@@ -37,26 +39,32 @@ import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.GlobalMetaData;
 import parquet.schema.MessageType;
 
+import static parquet.Preconditions.checkNotNull;
+
 /**
  * Read records from a Parquet file.
+ * TODO: too many constructors (https://issues.apache.org/jira/browse/PARQUET-39)
  */
 public class ParquetReader<T> implements Closeable {
 
-  private ReadSupport<T> readSupport;
-  private UnboundRecordFilter filter;
-  private Configuration conf;
-  private ReadContext readContext;
-  private Iterator<Footer> footersIterator;
+  private final ReadSupport<T> readSupport;
+  private final Configuration conf;
+  private final ReadContext readContext;
+  private final Iterator<Footer> footersIterator;
+  private final GlobalMetaData globalMetaData;
+  private final Filter filter;
+
   private InternalParquetRecordReader<T> reader;
-  private GlobalMetaData globalMetaData;
 
   /**
    * @param file the file to read
    * @param readSupport to materialize records
    * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
    */
+  @Deprecated
   public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException {
-    this(file, readSupport, null);
+    this(new Configuration(), file, readSupport, FilterCompat.NOOP);
   }
 
   /**
@@ -64,31 +72,44 @@ public class ParquetReader<T> implements Closeable {
    * @param file the file to read
    * @param readSupport to materialize records
    * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
    */
+  @Deprecated
   public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport) throws IOException {
-    this(conf, file, readSupport, null);
+    this(conf, file, readSupport, FilterCompat.NOOP);
   }
 
   /**
    * @param file the file to read
    * @param readSupport to materialize records
-   * @param filter the filter to use to filter records
+   * @param unboundRecordFilter the filter to use to filter records
    * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
    */
-  public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
-    this(new Configuration(), file, readSupport, filter);
+  @Deprecated
+  public ParquetReader(Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    this(new Configuration(), file, readSupport, FilterCompat.get(unboundRecordFilter));
   }
 
   /**
    * @param conf the configuration
    * @param file the file to read
    * @param readSupport to materialize records
-   * @param filter the filter to use to filter records
+   * @param unboundRecordFilter the filter to use to filter records
    * @throws IOException
+   * @deprecated use {@link #builder(ReadSupport, Path)}
    */
-  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter filter) throws IOException {
+  @Deprecated
+  public ParquetReader(Configuration conf, Path file, ReadSupport<T> readSupport, UnboundRecordFilter unboundRecordFilter) throws IOException {
+    this(conf, file, readSupport, FilterCompat.get(unboundRecordFilter));
+  }
+
+  private ParquetReader(Configuration conf,
+                       Path file,
+                       ReadSupport<T> readSupport,
+                       Filter filter) throws IOException {
     this.readSupport = readSupport;
-    this.filter = filter;
+    this.filter = checkNotNull(filter, "filter");
     this.conf = conf;
 
     FileSystem fs = file.getFileSystem(conf);
@@ -96,12 +117,6 @@ public class ParquetReader<T> implements Closeable {
     List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
     this.footersIterator = footers.iterator();
     globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
-
-    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
-    for (Footer footer : footers) {
-      blocks.addAll(footer.getParquetMetadata().getBlocks());
-    }
-
     MessageType schema = globalMetaData.getSchema();
     Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
     readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
@@ -131,10 +146,15 @@ public class ParquetReader<T> implements Closeable {
     }
     if (footersIterator.hasNext()) {
       Footer footer = footersIterator.next();
+
+      List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
+
+      List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, footer.getParquetMetadata().getFileMetaData().getSchema());
+
       reader = new InternalParquetRecordReader<T>(readSupport, filter);
       reader.initialize(
           readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData().getKeyValueMetaData(),
-          readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
+          readContext.getReadSupportMetadata(), footer.getFile(), filteredBlocks, conf);
     }
   }
 
@@ -144,4 +164,36 @@ public class ParquetReader<T> implements Closeable {
       reader.close();
     }
   }
+
+  public static <T> Builder<T> builder(ReadSupport<T> readSupport, Path path) {
+    return new Builder<T>(readSupport, path);
+  }
+
+  public static class Builder<T> {
+    private final ReadSupport<T> readSupport;
+    private final Path file;
+    private Configuration conf;
+    private Filter filter;
+
+    private Builder(ReadSupport<T> readSupport, Path path) {
+      this.readSupport = checkNotNull(readSupport, "readSupport");
+      this.file = checkNotNull(path, "path");
+      this.conf = new Configuration();
+      this.filter = FilterCompat.NOOP;
+    }
+
+    public Builder<T> withConf(Configuration conf) {
+      this.conf = checkNotNull(conf, "conf");
+      return this;
+    }
+
+    public Builder<T> withFilter(Filter filter) {
+      this.filter = checkNotNull(filter, "filter");
+      return this;
+    }
+
+    public ParquetReader<T> build() throws IOException {
+      return new ParquetReader<T>(conf, file, readSupport, filter);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
index f6f4815..67c7dd7 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetRecordReader.java
@@ -16,17 +16,21 @@
 package parquet.hadoop;
 
 import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
 import parquet.Log;
 import parquet.filter.UnboundRecordFilter;
+import parquet.filter2.compat.FilterCompat;
+import parquet.filter2.compat.FilterCompat.Filter;
 import parquet.hadoop.api.ReadSupport;
-import parquet.hadoop.util.counters.BenchmarkCounter;
 import parquet.hadoop.util.ContextUtil;
+import parquet.hadoop.util.counters.BenchmarkCounter;
 import parquet.schema.MessageTypeParser;
 
 /**
@@ -41,24 +45,34 @@ import parquet.schema.MessageTypeParser;
 public class ParquetRecordReader<T> extends RecordReader<Void, T> {
 
   private static final Log LOG= Log.getLog(ParquetRecordReader.class);
-  private InternalParquetRecordReader<T> internalReader;
+  private final InternalParquetRecordReader<T> internalReader;
 
   /**
    * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
    */
   public ParquetRecordReader(ReadSupport<T> readSupport) {
-    this(readSupport, null);
+    this(readSupport, FilterCompat.NOOP);
   }
 
   /**
    * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
-   * @param filter Optional filter for only returning matching records.
+   * @param filter for filtering individual records
    */
-  public ParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+  public ParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
     internalReader = new InternalParquetRecordReader<T>(readSupport, filter);
   }
 
   /**
+   * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+   * @param filter for filtering individual records
+   * @deprecated use {@link #ParquetRecordReader(ReadSupport, Filter)}
+   */
+  @Deprecated
+  public ParquetRecordReader(ReadSupport<T> readSupport, UnboundRecordFilter filter) {
+    this(readSupport, FilterCompat.get(filter));
+  }
+
+  /**
    * {@inheritDoc}
    */
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java
index 9c24475..41f27ed 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetWriter.java
@@ -204,6 +204,19 @@ public class ParquetWriter<T> implements Closeable {
     this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
   }
 
+  public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport) throws IOException {
+    this(file,
+        writeSupport,
+        DEFAULT_COMPRESSION_CODEC_NAME,
+        DEFAULT_BLOCK_SIZE,
+        DEFAULT_PAGE_SIZE,
+        DEFAULT_PAGE_SIZE,
+        DEFAULT_IS_DICTIONARY_ENABLED,
+        DEFAULT_IS_VALIDATING_ENABLED,
+        DEFAULT_WRITER_VERSION,
+        conf);
+  }
+
   public void write(T object) throws IOException {
     try {
       writer.write(object);

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
index d0b723b..9544865 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java
@@ -18,23 +18,18 @@ package parquet.hadoop.mapred;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.List;
 import java.util.Arrays;
+import java.util.List;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.InputFormat;
 
+import parquet.hadoop.Footer;
 import parquet.hadoop.ParquetInputFormat;
 import parquet.hadoop.ParquetInputSplit;
 import parquet.hadoop.ParquetRecordReader;
-import parquet.hadoop.Footer;
 
 @SuppressWarnings("deprecation")
 public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.FileInputFormat<Void, Container<V>> {
@@ -87,7 +82,9 @@ public class DeprecatedParquetInputFormat<V> extends org.apache.hadoop.mapred.Fi
       splitLen = oldSplit.getLength();
 
       try {
-        realReader = new ParquetRecordReader<V>(newInputFormat.getReadSupport(oldJobConf));
+        realReader = new ParquetRecordReader<V>(newInputFormat.getReadSupport(oldJobConf),
+            ParquetInputFormat.getFilter(oldJobConf));
+
         realReader.initialize(((ParquetInputSplitWrapper)oldSplit).realSplit, oldJobConf, reporter);
 
         // read once to gain access to key and value objects

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/Canonicalizer.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/Canonicalizer.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/Canonicalizer.java
deleted file mode 100644
index ece6e63..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/Canonicalizer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Copyright 2014 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop.metadata;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * returns canonical representation of objects (similar to String.intern()) to save memory
- * if a.equals(b) then canonicalize(a) == canonicalize(b)
- * this class is thread safe
- * @author Julien Le Dem
- *
- * @param <T>
- */
-public class Canonicalizer<T> {
-
-  private ConcurrentHashMap<T, T> canonicals = new ConcurrentHashMap<T, T>();
-
-  /**
-   * @param value the value to canonicalize
-   * @return the corresponding canonical value
-   */
-  final public T canonicalize(T value) {
-    T canonical = canonicals.get(value);
-    if (canonical == null) {
-      value = toCanonical(value);
-      T existing = canonicals.putIfAbsent(value, value);
-      // putIfAbsent is atomic, making sure we always return the same canonical representation of the value
-      if (existing == null) {
-        canonical = value;
-      } else {
-        canonical = existing;
-      }
-    }
-    return canonical;
-  }
-
-  /**
-   * @param value the value to canonicalize if needed
-   * @return the canonicalized value
-   */
-  protected T toCanonical(T value) {
-    return value;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 98de367..45af78a 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -18,8 +18,9 @@ package parquet.hadoop.metadata;
 import java.util.Set;
 
 import parquet.column.Encoding;
-import parquet.column.statistics.Statistics;
 import parquet.column.statistics.BooleanStatistics;
+import parquet.column.statistics.Statistics;
+import parquet.common.schema.ColumnPath;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java
index 074a900..9b9a7a8 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkProperties.java
@@ -19,6 +19,8 @@ import java.util.Arrays;
 import java.util.Set;
 
 import parquet.column.Encoding;
+import parquet.common.internal.Canonicalizer;
+import parquet.common.schema.ColumnPath;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
 public class ColumnChunkProperties {

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnPath.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnPath.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnPath.java
deleted file mode 100644
index b179ae3..0000000
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnPath.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Copyright 2012 Twitter, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package parquet.hadoop.metadata;
-
-import java.util.Arrays;
-import java.util.Iterator;
-
-public final class ColumnPath implements Iterable<String> {
-
-  private static Canonicalizer<ColumnPath> paths = new Canonicalizer<ColumnPath>() {
-    protected ColumnPath toCanonical(ColumnPath value) {
-      String[] path = new String[value.p.length];
-      for (int i = 0; i < value.p.length; i++) {
-        path[i] = value.p[i].intern();
-      }
-      return new ColumnPath(path);
-    }
-  };
-
-  public static ColumnPath get(String... path){
-    return paths.canonicalize(new ColumnPath(path));
-  }
-
-  private final String[] p;
-
-  private ColumnPath(String[] path) {
-    this.p = path;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof ColumnPath) {
-      return Arrays.equals(p, ((ColumnPath)obj).p);
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return Arrays.hashCode(p);
-  }
-
-  @Override
-  public String toString() {
-    return Arrays.toString(p);
-  }
-
-  @Override
-  public Iterator<String> iterator() {
-    return Arrays.asList(p).iterator();
-  }
-
-  public int size() {
-    return p.length;
-  }
-
-  public String[] toArray() {
-    return p;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java
index 790b601..6121111 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/EncodingList.java
@@ -20,6 +20,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import parquet.column.Encoding;
+import parquet.common.internal.Canonicalizer;
 
 public class EncodingList implements Iterable<Encoding> {
 

http://git-wip-us.apache.org/repos/asf/incubator-parquet-mr/blob/ad32bf0f/parquet-hadoop/src/main/java/parquet/hadoop/util/SerializationUtil.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/util/SerializationUtil.java b/parquet-hadoop/src/main/java/parquet/hadoop/util/SerializationUtil.java
new file mode 100644
index 0000000..0cd5df5
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/util/SerializationUtil.java
@@ -0,0 +1,93 @@
+package parquet.hadoop.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.conf.Configuration;
+
+import parquet.Closeables;
+import parquet.Log;
+
+/**
+ * Serialization utils copied from:
+ * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/util/HadoopUtils.java
+ *
+ * TODO: Refactor elephant-bird so that we can depend on utils like this without extra baggage.
+ */
+public final class SerializationUtil {
+  private static final Log LOG = Log.getLog(SerializationUtil.class);
+
+  private SerializationUtil() { }
+
+  /**
+   * Reads an object (that was written using
+   * {@link #writeObjectToConfAsBase64}) from a configuration.
+   *
+   * @param key for the configuration
+   * @param conf to read from
+   * @return the read object, or null if key is not present in conf
+   * @throws IOException
+   */
+  public static void writeObjectToConfAsBase64(String key, Object obj, Configuration conf) throws IOException {
+    ByteArrayOutputStream baos = null;
+    GZIPOutputStream gos = null;
+    ObjectOutputStream oos = null;
+
+    try {
+      baos = new ByteArrayOutputStream();
+      gos = new GZIPOutputStream(baos);
+      oos = new ObjectOutputStream(gos);
+      oos.writeObject(obj);
+    } finally {
+      Closeables.close(oos);
+      Closeables.close(gos);
+      Closeables.close(baos);
+    }
+
+    conf.set(key, new String(Base64.encodeBase64(baos.toByteArray()), "UTF-8"));
+  }
+
+  /**
+   * Reads an object (that was written using
+   * {@link #writeObjectToConfAsBase64}) from a configuration
+   *
+   * @param key for the configuration
+   * @param conf to read from
+   * @return the read object, or null if key is not present in conf
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static <T> T readObjectFromConfAsBase64(String key, Configuration conf) throws IOException {
+    String b64 = conf.get(key);
+    if (b64 == null) {
+      return null;
+    }
+
+    byte[] bytes = Base64.decodeBase64(b64.getBytes("UTF-8"));
+
+    ByteArrayInputStream bais = null;
+    GZIPInputStream gis = null;
+    ObjectInputStream ois = null;
+
+    try {
+      bais = new ByteArrayInputStream(bytes);
+      gis = new GZIPInputStream(bais);
+      ois = new ObjectInputStream(gis);
+      return (T) ois.readObject();
+    } catch (ClassNotFoundException e) {
+      throw new IOException("Could not read object from config with key " + key, e);
+    } catch (ClassCastException e) {
+      throw new IOException("Couldn't cast object read from config with key " + key, e);
+    } finally {
+      Closeables.close(ois);
+      Closeables.close(gis);
+      Closeables.close(bais);
+    }
+  }
+}