You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2016/07/15 16:53:37 UTC

parquet-mr git commit: PARQUET-389: Support predicate push down on missing columns.

Repository: parquet-mr
Updated Branches:
  refs/heads/master 02ce9b094 -> 42662f875


PARQUET-389: Support predicate push down on missing columns.

Predicate push-down will complain when predicates reference columns that aren't in a file's schema. This makes it difficult to implement predicate push-down in engines where schemas evolve because each task needs to process the predicates and prune references to columns not in that task's file. This PR implements predicate evaluation for missing columns, where the values are all null. This allows engines to pass predicates as they are written.

A future commit should rewrite the predicates to avoid the extra work currently done in record-level filtering, but that isn't included here because it is an optimization.

Author: Ryan Blue <bl...@apache.org>

Closes #354 from rdblue/PARQUET-389-predicate-push-down-on-missing-columns and squashes the following commits:

b4d809a [Ryan Blue] PARQUET-389: Support record-level filtering with missing columns.
91b841c [Ryan Blue] PARQUET-389: Add missing column support to StatisticsFilter.
275f950 [Ryan Blue] PARQUET-389: Add missing column support to DictionaryFilter.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/42662f87
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/42662f87
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/42662f87

Branch: refs/heads/master
Commit: 42662f8750a2c33ee169f17f4b4e4586db98d869
Parents: 02ce9b0
Author: Ryan Blue <bl...@apache.org>
Authored: Fri Jul 15 09:53:33 2016 -0700
Committer: Ryan Blue <bl...@apache.org>
Committed: Fri Jul 15 09:53:33 2016 -0700

----------------------------------------------------------------------
 .../predicate/SchemaCompatibilityValidator.java |   9 +-
 .../dictionarylevel/DictionaryFilter.java       |  80 ++++--
 .../statisticslevel/StatisticsFilter.java       | 138 +++++++---
 .../filter2/TestFiltersWithMissingColumns.java  | 265 +++++++++++++++++++
 .../dictionarylevel/DictionaryFilterTest.java   |  54 ++++
 .../statisticslevel/TestStatisticsFilter.java   |  39 ++-
 6 files changed, 508 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/42662f87/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
index d4e1211..64477f5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/SchemaCompatibilityValidator.java
@@ -167,6 +167,11 @@ public class SchemaCompatibilityValidator implements FilterPredicate.Visitor<Voi
     }
 
     ColumnDescriptor descriptor = getColumnDescriptor(path);
+    if (descriptor == null) {
+      // the column is missing from the schema. evaluation uses calls
+      // updateNull() a value is missing, so this will be handled correctly.
+      return;
+    }
 
     if (descriptor.getMaxRepetitionLevel() > 0) {
       throw new IllegalArgumentException("FilterPredicates do not currently support repeated columns. "
@@ -177,8 +182,6 @@ public class SchemaCompatibilityValidator implements FilterPredicate.Visitor<Voi
   }
 
   private ColumnDescriptor getColumnDescriptor(ColumnPath columnPath) {
-    ColumnDescriptor cd = columnsAccordingToSchema.get(columnPath);
-    checkArgument(cd != null, "Column " + columnPath + " was not found in schema!");
-    return cd;
+    return columnsAccordingToSchema.get(columnPath);
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/42662f87/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
index dc1d649..bf99435 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
@@ -69,9 +69,7 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
   }
 
   private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
-    ColumnChunkMetaData c = columns.get(columnPath);
-    checkArgument(c != null, "Column " + columnPath.toDotString() + " not found in schema!");
-    return c;
+    return columns.get(columnPath);
   }
 
   @SuppressWarnings("unchecked")
@@ -110,22 +108,26 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
 
   @Override
   public <T extends Comparable<T>> Boolean visit(Eq<T> eq) {
-    Column<T> filterColumn = eq.getColumn();
-    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+    T value = eq.getValue();
 
-    // if the chunk has non-dictionary pages, don't bother decoding the
-    // dictionary because the row group can't be eliminated.
-    if (hasNonDictionaryPages(meta)) {
+    if (value == null) {
+      // the dictionary contains only non-null values so isn't helpful. this
+      // could check the column stats, but the StatisticsFilter is responsible
       return BLOCK_MIGHT_MATCH;
     }
 
-    T value = eq.getValue();
+    Column<T> filterColumn = eq.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
 
-    filterColumn.getColumnPath();
+    if (meta == null) {
+      // the column isn't in this file so all values are null, but the value
+      // must be non-null because of the above check.
+      return BLOCK_CANNOT_MATCH;
+    }
 
-    if (value == null) {
-      // the dictionary contains only non-null values so isn't helpful. this
-      // could check the column stats, but the StatisticsFilter is responsible
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
       return BLOCK_MIGHT_MATCH;
     }
 
@@ -146,15 +148,13 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
     Column<T> filterColumn = notEq.getColumn();
     ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
 
-    // if the chunk has non-dictionary pages, don't bother decoding the
-    // dictionary because the row group can't be eliminated.
-    if (hasNonDictionaryPages(meta)) {
-      return BLOCK_MIGHT_MATCH;
-    }
-
     T value = notEq.getValue();
 
-    filterColumn.getColumnPath();
+    if (value == null && meta == null) {
+      // the predicate value is null and all rows have a null value, so the
+      // predicate is always false (null != null)
+      return BLOCK_CANNOT_MATCH;
+    }
 
     if (value == null) {
       // the dictionary contains only non-null values so isn't helpful. this
@@ -162,6 +162,18 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
       return BLOCK_MIGHT_MATCH;
     }
 
+    if (meta == null) {
+      // column is missing from this file and is always null and not equal to
+      // the non-null test value, so the predicate is true for all rows
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    // if the chunk has non-dictionary pages, don't bother decoding the
+    // dictionary because the row group can't be eliminated.
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
     try {
       Set<T> dictSet = expandDictionary(meta);
       if (dictSet != null && dictSet.size() == 1 && dictSet.contains(value)) {
@@ -179,6 +191,12 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
     Column<T> filterColumn = lt.getColumn();
     ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
 
+    if (meta == null) {
+      // the column is missing and always null, which is never less than a
+      // value. for all x, null is never < x.
+      return BLOCK_CANNOT_MATCH;
+    }
+
     // if the chunk has non-dictionary pages, don't bother decoding the
     // dictionary because the row group can't be eliminated.
     if (hasNonDictionaryPages(meta)) {
@@ -187,8 +205,6 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
 
     T value = lt.getValue();
 
-    filterColumn.getColumnPath();
-
     try {
       Set<T> dictSet = expandDictionary(meta);
       if (dictSet == null) {
@@ -214,6 +230,12 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
     Column<T> filterColumn = ltEq.getColumn();
     ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
 
+    if (meta == null) {
+      // the column is missing and always null, which is never less than or
+      // equal to a value. for all x, null is never <= x.
+      return BLOCK_CANNOT_MATCH;
+    }
+
     // if the chunk has non-dictionary pages, don't bother decoding the
     // dictionary because the row group can't be eliminated.
     if (hasNonDictionaryPages(meta)) {
@@ -249,6 +271,12 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
     Column<T> filterColumn = gt.getColumn();
     ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
 
+    if (meta == null) {
+      // the column is missing and always null, which is never greater than a
+      // value. for all x, null is never > x.
+      return BLOCK_CANNOT_MATCH;
+    }
+
     // if the chunk has non-dictionary pages, don't bother decoding the
     // dictionary because the row group can't be eliminated.
     if (hasNonDictionaryPages(meta)) {
@@ -257,8 +285,6 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
 
     T value = gt.getValue();
 
-    filterColumn.getColumnPath();
-
     try {
       Set<T> dictSet = expandDictionary(meta);
       if (dictSet == null) {
@@ -284,6 +310,12 @@ public class DictionaryFilter implements FilterPredicate.Visitor<Boolean> {
     Column<T> filterColumn = gtEq.getColumn();
     ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
 
+    if (meta == null) {
+      // the column is missing and always null, which is never greater than or
+      // equal to a value. for all x, null is never >= x.
+      return BLOCK_CANNOT_MATCH;
+    }
+
     // if the chunk has non-dictionary pages, don't bother decoding the
     // dictionary because the row group can't be eliminated.
     if (hasNonDictionaryPages(meta)) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/42662f87/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
index 7f2187a..b37297a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/statisticslevel/StatisticsFilter.java
@@ -64,6 +64,9 @@ import static org.apache.parquet.Preconditions.checkNotNull;
 // TODO: (https://issues.apache.org/jira/browse/PARQUET-38)
 public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
 
+  private static final boolean BLOCK_MIGHT_MATCH = false;
+  private static final boolean BLOCK_CANNOT_MATCH = true;
+
   public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns) {
     checkNotNull(pred, "pred");
     checkNotNull(columns, "columns");
@@ -79,9 +82,7 @@ public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
   }
 
   private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
-    ColumnChunkMetaData c = columns.get(columnPath);
-    checkArgument(c != null, "Column " + columnPath.toDotString() + " not found in schema!");
-    return c;
+    return columns.get(columnPath);
   }
 
   // is this column chunk composed entirely of nulls?
@@ -97,27 +98,39 @@ public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T extends Comparable<T>> Boolean visit(Eq<T> eq) {
     Column<T> filterColumn = eq.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
     T value = eq.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
+
+    if (meta == null) {
+      // the column isn't in this file so all values are null.
+      if (value != null) {
+        // non-null is never null
+        return BLOCK_CANNOT_MATCH;
+      }
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    Statistics<T> stats = meta.getStatistics();
 
     if (stats.isEmpty()) {
       // we have no statistics available, we cannot drop any chunks
-      return false;
+      return BLOCK_MIGHT_MATCH;
     }
 
     if (value == null) {
       // we are looking for records where v eq(null)
       // so drop if there are no nulls in this chunk
-      return !hasNulls(columnChunk);
+      return !hasNulls(meta);
     }
 
-    if (isAllNulls(columnChunk)) {
+    if (isAllNulls(meta)) {
       // we are looking for records where v eq(someNonNull)
       // and this is a column of all nulls, so drop it
-      return true;
+      return BLOCK_CANNOT_MATCH;
     }
 
     // drop if value < min || value > max
@@ -125,27 +138,38 @@ public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T extends Comparable<T>> Boolean visit(NotEq<T> notEq) {
     Column<T> filterColumn = notEq.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
     T value = notEq.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
+
+    if (meta == null) {
+      if (value == null) {
+        // null is always equal to null
+        return BLOCK_CANNOT_MATCH;
+      }
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    Statistics<T> stats = meta.getStatistics();
 
     if (stats.isEmpty()) {
       // we have no statistics available, we cannot drop any chunks
-      return false;
+      return BLOCK_MIGHT_MATCH;
     }
 
     if (value == null) {
       // we are looking for records where v notEq(null)
       // so, if this is a column of all nulls, we can drop it
-      return isAllNulls(columnChunk);
+      return isAllNulls(meta);
     }
 
-    if (hasNulls(columnChunk)) {
+    if (hasNulls(meta)) {
       // we are looking for records where v notEq(someNonNull)
       // but this chunk contains nulls, we cannot drop it
-      return false;
+      return BLOCK_MIGHT_MATCH;
     }
 
     // drop if this is a column where min = max = value
@@ -153,89 +177,125 @@ public class StatisticsFilter implements FilterPredicate.Visitor<Boolean> {
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T extends Comparable<T>> Boolean visit(Lt<T> lt) {
     Column<T> filterColumn = lt.getColumn();
-    T value = lt.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    if (meta == null) {
+      // the column is missing and always null, which is never less than a
+      // value. for all x, null is never < x.
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    Statistics<T> stats = meta.getStatistics();
 
     if (stats.isEmpty()) {
       // we have no statistics available, we cannot drop any chunks
-      return false;
+      return BLOCK_MIGHT_MATCH;
     }
 
-    if (isAllNulls(columnChunk)) {
+    if (isAllNulls(meta)) {
       // we are looking for records where v < someValue
       // this chunk is all nulls, so we can drop it
-      return true;
+      return BLOCK_CANNOT_MATCH;
     }
 
+    T value = lt.getValue();
+
     // drop if value <= min
     return  value.compareTo(stats.genericGetMin()) <= 0;
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T extends Comparable<T>> Boolean visit(LtEq<T> ltEq) {
     Column<T> filterColumn = ltEq.getColumn();
-    T value = ltEq.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    if (meta == null) {
+      // the column is missing and always null, which is never less than or
+      // equal to a value. for all x, null is never <= x.
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    Statistics<T> stats = meta.getStatistics();
 
     if (stats.isEmpty()) {
       // we have no statistics available, we cannot drop any chunks
-      return false;
+      return BLOCK_MIGHT_MATCH;
     }
 
-    if (isAllNulls(columnChunk)) {
+    if (isAllNulls(meta)) {
       // we are looking for records where v <= someValue
       // this chunk is all nulls, so we can drop it
-      return true;
+      return BLOCK_CANNOT_MATCH;
     }
 
+    T value = ltEq.getValue();
+
     // drop if value < min
     return value.compareTo(stats.genericGetMin()) < 0;
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T extends Comparable<T>> Boolean visit(Gt<T> gt) {
     Column<T> filterColumn = gt.getColumn();
-    T value = gt.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    if (meta == null) {
+      // the column is missing and always null, which is never greater than a
+      // value. for all x, null is never > x.
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    Statistics<T> stats = meta.getStatistics();
 
     if (stats.isEmpty()) {
       // we have no statistics available, we cannot drop any chunks
-      return false;
+      return BLOCK_MIGHT_MATCH;
     }
 
-    if (isAllNulls(columnChunk)) {
+    if (isAllNulls(meta)) {
       // we are looking for records where v > someValue
       // this chunk is all nulls, so we can drop it
-      return true;
+      return BLOCK_CANNOT_MATCH;
     }
 
+    T value = gt.getValue();
+
     // drop if value >= max
     return value.compareTo(stats.genericGetMax()) >= 0;
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public <T extends Comparable<T>> Boolean visit(GtEq<T> gtEq) {
     Column<T> filterColumn = gtEq.getColumn();
-    T value = gtEq.getValue();
-    ColumnChunkMetaData columnChunk = getColumnChunk(filterColumn.getColumnPath());
-    Statistics<T> stats = columnChunk.getStatistics();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+
+    if (meta == null) {
+      // the column is missing and always null, which is never greater than or
+      // equal to a value. for all x, null is never >= x.
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    Statistics<T> stats = meta.getStatistics();
 
     if (stats.isEmpty()) {
       // we have no statistics available, we cannot drop any chunks
-      return false;
+      return BLOCK_MIGHT_MATCH;
     }
 
-    if (isAllNulls(columnChunk)) {
+    if (isAllNulls(meta)) {
       // we are looking for records where v >= someValue
       // this chunk is all nulls, so we can drop it
-      return true;
+      return BLOCK_CANNOT_MATCH;
     }
 
+    T value = gtEq.getValue();
+
     // drop if value >= max
     return value.compareTo(stats.genericGetMax()) > 0;
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/42662f87/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java
new file mode 100644
index 0000000..3282f27
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/TestFiltersWithMissingColumns.java
@@ -0,0 +1,265 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.filter2;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.and;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.gt;
+import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.lt;
+import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
+import static org.apache.parquet.filter2.predicate.FilterApi.or;
+import static org.apache.parquet.io.api.Binary.fromString;
+import static org.apache.parquet.schema.OriginalType.UTF8;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.junit.Assert.assertEquals;
+
+public class TestFiltersWithMissingColumns {
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  public Path path;
+
+  @Before
+  public void createDataFile() throws Exception {
+    File file = temp.newFile("test.parquet");
+    this.path = new Path(file.toString());
+
+    MessageType type = Types.buildMessage()
+        .required(INT64).named("id")
+        .required(BINARY).as(UTF8).named("data")
+        .named("test");
+
+    SimpleGroupFactory factory = new SimpleGroupFactory(type);
+
+    ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+        .withType(type)
+        .build();
+
+    try {
+      for (long i = 0; i < 1000; i += 1) {
+        Group g = factory.newGroup();
+        g.add(0, i);
+        g.add(1, "data-" + i);
+        writer.write(g);
+      }
+    } finally {
+      writer.close();
+    }
+  }
+
+  @Test
+  public void testNormalFilter() throws Exception {
+    assertEquals(500, countFilteredRecords(path, lt(longColumn("id"), 500L)));
+  }
+
+  @Test
+  public void testSimpleMissingColumnFilter() throws Exception {
+    assertEquals(0, countFilteredRecords(path, lt(longColumn("missing"), 500L)));
+  }
+
+  @Test
+  public void testAndMissingColumnFilter() throws Exception {
+    // missing column filter is true
+    assertEquals(500, countFilteredRecords(path, and(
+        lt(longColumn("id"), 500L),
+        eq(binaryColumn("missing"), null)
+    )));
+    assertEquals(500, countFilteredRecords(path, and(
+        lt(longColumn("id"), 500L),
+        notEq(binaryColumn("missing"), fromString("any"))
+    )));
+
+    assertEquals(500, countFilteredRecords(path, and(
+        eq(binaryColumn("missing"), null),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(500, countFilteredRecords(path, and(
+        notEq(binaryColumn("missing"), fromString("any")),
+        lt(longColumn("id"), 500L)
+    )));
+
+    // missing column filter is false
+    assertEquals(0, countFilteredRecords(path, and(
+        lt(longColumn("id"), 500L),
+        eq(binaryColumn("missing"), fromString("any"))
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        lt(longColumn("id"), 500L),
+        notEq(binaryColumn("missing"), null)
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        lt(longColumn("id"), 500L),
+        lt(doubleColumn("missing"), 33.33)
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        lt(longColumn("id"), 500L),
+        ltEq(doubleColumn("missing"), 33.33)
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        lt(longColumn("id"), 500L),
+        gt(doubleColumn("missing"), 33.33)
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        lt(longColumn("id"), 500L),
+        gtEq(doubleColumn("missing"), 33.33)
+    )));
+
+    assertEquals(0, countFilteredRecords(path, and(
+        eq(binaryColumn("missing"), fromString("any")),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        notEq(binaryColumn("missing"), null),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        lt(doubleColumn("missing"), 33.33),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        ltEq(doubleColumn("missing"), 33.33),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        gt(doubleColumn("missing"), 33.33),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(0, countFilteredRecords(path, and(
+        gtEq(doubleColumn("missing"), 33.33),
+        lt(longColumn("id"), 500L)
+    )));
+  }
+
+  @Test
+  public void testOrMissingColumnFilter() throws Exception {
+    // missing column filter is false
+    assertEquals(500, countFilteredRecords(path, or(
+        lt(longColumn("id"), 500L),
+        eq(binaryColumn("missing"), fromString("any"))
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        lt(longColumn("id"), 500L),
+        notEq(binaryColumn("missing"), null)
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        lt(longColumn("id"), 500L),
+        lt(doubleColumn("missing"), 33.33)
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        lt(longColumn("id"), 500L),
+        ltEq(doubleColumn("missing"), 33.33)
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        lt(longColumn("id"), 500L),
+        gt(doubleColumn("missing"), 33.33)
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        lt(longColumn("id"), 500L),
+        gtEq(doubleColumn("missing"), 33.33)
+    )));
+
+    assertEquals(500, countFilteredRecords(path, or(
+        eq(binaryColumn("missing"), fromString("any")),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        notEq(binaryColumn("missing"), null),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        lt(doubleColumn("missing"), 33.33),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        ltEq(doubleColumn("missing"), 33.33),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        gt(doubleColumn("missing"), 33.33),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(500, countFilteredRecords(path, or(
+        gtEq(doubleColumn("missing"), 33.33),
+        lt(longColumn("id"), 500L)
+    )));
+
+    // missing column filter is false
+    assertEquals(1000, countFilteredRecords(path, or(
+        lt(longColumn("id"), 500L),
+        eq(binaryColumn("missing"), null)
+    )));
+    assertEquals(1000, countFilteredRecords(path, or(
+        lt(longColumn("id"), 500L),
+        notEq(binaryColumn("missing"), fromString("any"))
+    )));
+
+    assertEquals(1000, countFilteredRecords(path, or(
+        eq(binaryColumn("missing"), null),
+        lt(longColumn("id"), 500L)
+    )));
+    assertEquals(1000, countFilteredRecords(path, or(
+        notEq(binaryColumn("missing"), fromString("any")),
+        lt(longColumn("id"), 500L)
+    )));
+  }
+
+  public static long countFilteredRecords(Path path, FilterPredicate pred) throws IOException{
+    ParquetReader<Group> reader = ParquetReader
+        .builder(new GroupReadSupport(), path)
+        .withFilter(FilterCompat.get(pred))
+        .build();
+
+    long count = 0;
+    try {
+      while (reader.read() != null) {
+        count += 1;
+      }
+    } finally {
+      reader.close();
+    }
+    return count;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/42662f87/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
index 35b944d..7af0c40 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -383,6 +383,60 @@ public class DictionaryFilterTest {
     verifyZeroInteractions(dictionaryStore);
   }
 
+  @Test
+  public void testEqMissingColumn() throws Exception {
+    BinaryColumn b = binaryColumn("missing_column");
+
+    assertTrue("Should drop block for non-null query",
+        canDrop(eq(b, Binary.fromString("any")), ccmd, dictionaries));
+
+    assertFalse("Should not drop block null query",
+        canDrop(eq(b, null), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testNotEqMissingColumn() throws Exception {
+    BinaryColumn b = binaryColumn("missing_column");
+
+    assertFalse("Should not drop block for non-null query",
+        canDrop(notEq(b, Binary.fromString("any")), ccmd, dictionaries));
+
+    assertTrue("Should not drop block null query",
+        canDrop(notEq(b, null), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testLtMissingColumn() throws Exception {
+    BinaryColumn b = binaryColumn("missing_column");
+
+    assertTrue("Should drop block for any non-null query",
+        canDrop(lt(b, Binary.fromString("any")), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testLtEqMissingColumn() throws Exception {
+    BinaryColumn b = binaryColumn("missing_column");
+
+    assertTrue("Should drop block for any non-null query",
+        canDrop(ltEq(b, Binary.fromString("any")), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testGtMissingColumn() throws Exception {
+    BinaryColumn b = binaryColumn("missing_column");
+
+    assertTrue("Should drop block for any non-null query",
+        canDrop(gt(b, Binary.fromString("any")), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testGtEqMissingColumn() throws Exception {
+    BinaryColumn b = binaryColumn("missing_column");
+
+    assertTrue("Should drop block for any non-null query",
+        canDrop(gtEq(b, Binary.fromString("any")), ccmd, dictionaries));
+  }
+
   private static double toDouble(int value) {
     return (value * 1.0);
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/42662f87/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
index 4e3fc7c..b47ed69 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/statisticslevel/TestStatisticsFilter.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.parquet.io.api.Binary;
 import org.junit.Test;
 
 import org.apache.parquet.column.Encoding;
@@ -30,6 +31,7 @@ import org.apache.parquet.column.statistics.IntStatistics;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
+import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
 import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
 import org.apache.parquet.filter2.predicate.Operators.IntColumn;
 import org.apache.parquet.filter2.predicate.Statistics;
@@ -38,6 +40,8 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.io.api.Binary.fromString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -78,6 +82,7 @@ public class TestStatisticsFilter {
 
   private static final IntColumn intColumn = intColumn("int.column");
   private static final DoubleColumn doubleColumn = doubleColumn("double.column");
+  private static final BinaryColumn missingColumn = binaryColumn("missing");
 
   private static final IntStatistics intStats = new IntStatistics();
   private static final IntStatistics nullIntStats = new IntStatistics();
@@ -109,6 +114,7 @@ public class TestStatisticsFilter {
 
     // drop columns of all nulls when looking for non-null value
     assertTrue(canDrop(eq(intColumn, 0), nullColumnMetas));
+    assertTrue(canDrop(eq(missingColumn, fromString("any")), columnMetas));
   }
 
   @Test
@@ -129,6 +135,7 @@ public class TestStatisticsFilter {
         getIntColumnMeta(statsSomeNulls, 177L),
         getDoubleColumnMeta(doubleStats, 177L))));
 
+    assertFalse(canDrop(eq(missingColumn, null), columnMetas));
   }
 
   @Test
@@ -144,6 +151,17 @@ public class TestStatisticsFilter {
         getIntColumnMeta(allSevens, 177L),
         getDoubleColumnMeta(doubleStats, 177L))));
 
+    allSevens.setNumNulls(100L);
+    assertFalse(canDrop(notEq(intColumn, 7), Arrays.asList(
+        getIntColumnMeta(allSevens, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    allSevens.setNumNulls(177L);
+    assertFalse(canDrop(notEq(intColumn, 7), Arrays.asList(
+        getIntColumnMeta(allSevens, 177L),
+        getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertFalse(canDrop(notEq(missingColumn, fromString("any")), columnMetas));
   }
 
   @Test
@@ -171,6 +189,8 @@ public class TestStatisticsFilter {
     assertTrue(canDrop(notEq(intColumn, null), Arrays.asList(
         getIntColumnMeta(statsAllNulls, 177L),
         getDoubleColumnMeta(doubleStats, 177L))));
+
+    assertTrue(canDrop(notEq(missingColumn, null), columnMetas));
   }
 
   @Test
@@ -182,6 +202,8 @@ public class TestStatisticsFilter {
 
     assertTrue(canDrop(lt(intColumn, 0), nullColumnMetas));
     assertTrue(canDrop(lt(intColumn, 7), nullColumnMetas));
+
+    assertTrue(canDrop(lt(missingColumn, fromString("any")), columnMetas));
   }
 
   @Test
@@ -193,6 +215,8 @@ public class TestStatisticsFilter {
 
     assertTrue(canDrop(ltEq(intColumn, 0), nullColumnMetas));
     assertTrue(canDrop(ltEq(intColumn, 7), nullColumnMetas));
+
+    assertTrue(canDrop(ltEq(missingColumn, fromString("any")), columnMetas));
   }
 
   @Test
@@ -204,6 +228,8 @@ public class TestStatisticsFilter {
 
     assertTrue(canDrop(gt(intColumn, 0), nullColumnMetas));
     assertTrue(canDrop(gt(intColumn, 7), nullColumnMetas));
+
+    assertTrue(canDrop(gt(missingColumn, fromString("any")), columnMetas));
   }
 
   @Test
@@ -215,6 +241,8 @@ public class TestStatisticsFilter {
 
     assertTrue(canDrop(gtEq(intColumn, 0), nullColumnMetas));
     assertTrue(canDrop(gtEq(intColumn, 7), nullColumnMetas));
+
+    assertTrue(canDrop(gtEq(missingColumn, fromString("any")), columnMetas));
   }
 
   @Test
@@ -311,15 +339,4 @@ public class TestStatisticsFilter {
     }
   }
 
-  @Test
-  public void testMissingColumn() {
-    List<ColumnChunkMetaData> columnMetas = Arrays.asList(getIntColumnMeta(new IntStatistics(), 0L));
-    try {
-      canDrop(and(eq(doubleColumn, 12.0), eq(intColumn, 17)), columnMetas);
-      fail("This should throw");
-    } catch (IllegalArgumentException e) {
-      assertEquals("Column double.column not found in schema!", e.getMessage());
-    }
-  }
-
 }