You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2015/07/31 16:49:35 UTC

hive git commit: HIVE-11401: Predicate push down does not work with Parquet when partitions are in the expression (Sergio Pena, reviewed by Szehon Ho)

Repository: hive
Updated Branches:
  refs/heads/master 7df9d7a93 -> 724b31930


HIVE-11401: Predicate push down does not work with Parquet when partitions are in the expression (Sergio Pena, reviewed by Szehon Ho)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/724b3193
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/724b3193
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/724b3193

Branch: refs/heads/master
Commit: 724b31930718eea606dfe6d95eda7385209caa5f
Parents: 7df9d7a
Author: Sergio Pena <se...@cloudera.com>
Authored: Fri Jul 31 09:48:28 2015 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Fri Jul 31 09:48:28 2015 -0500

----------------------------------------------------------------------
 .../read/ParquetFilterPredicateConverter.java   | 148 +++++++++++++++++++
 .../read/ParquetRecordReaderWrapper.java        | 122 ++-------------
 .../parquet/TestParquetRecordReaderWrapper.java |  14 +-
 .../read/TestParquetFilterPredicate.java        |  51 +++++++
 .../ql/io/sarg/TestConvertAstToSearchArg.java   |  25 ++--
 .../clientpositive/parquet_predicate_pushdown.q |   9 ++
 .../parquet_predicate_pushdown.q.out            |  47 ++++++
 7 files changed, 283 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java
new file mode 100644
index 0000000..f170026
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetFilterPredicateConverter.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.parquet.read;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.io.parquet.FilterPredicateLeafBuilder;
+import org.apache.hadoop.hive.ql.io.parquet.LeafFilterFactory;
+import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class ParquetFilterPredicateConverter {
+  private static final Log LOG = LogFactory.getLog(ParquetFilterPredicateConverter.class);
+
+  /**
+   * Translate the search argument to the filter predicate parquet uses
+   * @return translate the sarg into a filter predicate
+   */
+  public static FilterPredicate toFilterPredicate(SearchArgument sarg) {
+    return toFilterPredicate(sarg, null);
+  }
+
+  /**
+   * Translate the search argument to the filter predicate parquet uses. It includes
+   * only the columns from the passed schema.
+   * @return translate the sarg into a filter predicate
+   */
+  public static FilterPredicate toFilterPredicate(SearchArgument sarg, MessageType schema) {
+    Set<String> columns = null;
+    if (schema != null) {
+      columns = new HashSet<String>();
+      for (Type field : schema.getFields()) {
+        columns.add(field.getName());
+      }
+    }
+
+    return translate(sarg.getExpression(), sarg.getLeaves(), columns);
+  }
+
+  private static FilterPredicate translate(ExpressionTree root, List<PredicateLeaf> leaves, Set<String> columns) {
+    FilterPredicate p = null;
+    switch (root.getOperator()) {
+      case OR:
+        for(ExpressionTree child: root.getChildren()) {
+          if (p == null) {
+            p = translate(child, leaves, columns);
+          } else {
+            FilterPredicate right = translate(child, leaves, columns);
+            // constant means no filter, ignore it when it is null
+            if(right != null){
+              p = FilterApi.or(p, right);
+            }
+          }
+        }
+        return p;
+      case AND:
+        for(ExpressionTree child: root.getChildren()) {
+          if (p == null) {
+            p = translate(child, leaves, columns);
+          } else {
+            FilterPredicate right = translate(child, leaves, columns);
+            // constant means no filter, ignore it when it is null
+            if(right != null){
+              p = FilterApi.and(p, right);
+            }
+          }
+        }
+        return p;
+      case NOT:
+        FilterPredicate op = translate(root.getChildren().get(0), leaves, columns);
+        if (op != null) {
+          return FilterApi.not(op);
+        } else {
+          return null;
+        }
+      case LEAF:
+        PredicateLeaf leaf = leaves.get(root.getLeaf());
+
+        // If columns is null, then we need to create the leaf
+        if (columns == null || columns.contains(leaf.getColumnName())) {
+          return buildFilterPredicateFromPredicateLeaf(leaf);
+        } else {
+          // Do not create predicate if the leaf is not on the passed schema.
+          return null;
+        }
+      case CONSTANT:
+        return null;// no filter will be executed for constant
+      default:
+        throw new IllegalStateException("Unknown operator: " +
+            root.getOperator());
+    }
+  }
+
+  private static FilterPredicate buildFilterPredicateFromPredicateLeaf
+      (PredicateLeaf leaf) {
+    LeafFilterFactory leafFilterFactory = new LeafFilterFactory();
+    FilterPredicateLeafBuilder builder;
+    try {
+      builder = leafFilterFactory
+          .getLeafFilterBuilderByType(leaf.getType());
+      if (builder == null) {
+        return null;
+      }
+      if (isMultiLiteralsOperator(leaf.getOperator())) {
+        return builder.buildPredicate(leaf.getOperator(),
+            leaf.getLiteralList(),
+            leaf.getColumnName());
+      } else {
+        return builder
+            .buildPredict(leaf.getOperator(),
+                leaf.getLiteral(),
+                leaf.getColumnName());
+      }
+    } catch (Exception e) {
+      LOG.error("fail to build predicate filter leaf with errors" + e, e);
+      return null;
+    }
+  }
+
+  private static boolean isMultiLiteralsOperator(PredicateLeaf.Operator op) {
+    return (op == PredicateLeaf.Operator.IN) ||
+        (op == PredicateLeaf.Operator.BETWEEN);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
index 49e52da..f689b90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java
@@ -22,17 +22,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.IOConstants;
-import org.apache.hadoop.hive.ql.io.parquet.FilterPredicateLeafBuilder;
-import org.apache.hadoop.hive.ql.io.parquet.LeafFilterFactory;
 import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
-import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
@@ -46,7 +39,6 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
-import org.apache.parquet.filter2.predicate.FilterApi;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetInputFormat;
@@ -57,6 +49,7 @@ import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.ContextUtil;
+import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.MessageTypeParser;
 
 import com.google.common.base.Strings;
@@ -139,26 +132,23 @@ public class ParquetRecordReaderWrapper  implements RecordReader<NullWritable, A
     }
   }
 
-  public FilterCompat.Filter setFilter(final JobConf conf) {
-    String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
-    String columnNamesString =
-      conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
-    if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty() ||
-      columnNamesString.isEmpty()) {
+  public FilterCompat.Filter setFilter(final JobConf conf, MessageType schema) {
+    SearchArgument sarg = ConvertAstToSearchArg.createFromConf(conf);
+    if (sarg == null) {
       return null;
     }
 
-    SearchArgument sarg =
-        ConvertAstToSearchArg.create(Utilities.deserializeExpression
-            (serializedPushdown));
-    FilterPredicate p = toFilterPredicate(sarg);
+    // Create the Parquet FilterPredicate without including columns that do not exist
+    // on the shema (such as partition columns).
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema);
     if (p != null) {
-      LOG.debug("Predicate filter for parquet is " + p.toString());
+      // Filter may have sensitive information. Do not send to debug.
+      LOG.debug("PARQUET predicate push down generated.");
       ParquetInputFormat.setFilterPredicate(conf, p);
       return FilterCompat.get(p);
     } else {
-      LOG.debug("No predicate filter can be generated for " + TableScanDesc.FILTER_EXPR_CONF_STR +
-        " with the value of " + serializedPushdown);
+      // Filter may have sensitive information. Do not send to debug.
+      LOG.debug("No PARQUET predicate push down is generated.");
       return null;
     }
   }
@@ -250,7 +240,6 @@ public class ParquetRecordReaderWrapper  implements RecordReader<NullWritable, A
     if (oldSplit instanceof FileSplit) {
       final Path finalPath = ((FileSplit) oldSplit).getPath();
       jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent());
-      FilterCompat.Filter filter = setFilter(jobConf);
 
       final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath);
       final List<BlockMetaData> blocks = parquetMetadata.getBlocks();
@@ -274,6 +263,7 @@ public class ParquetRecordReaderWrapper  implements RecordReader<NullWritable, A
         return null;
       }
 
+      FilterCompat.Filter filter = setFilter(jobConf, fileMetaData.getSchema());
       if (filter != null) {
         filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema());
         if (filtedBlocks.isEmpty()) {
@@ -310,92 +300,4 @@ public class ParquetRecordReaderWrapper  implements RecordReader<NullWritable, A
   public List<BlockMetaData> getFiltedBlocks() {
     return filtedBlocks;
   }
-
-  /**
-   * Translate the search argument to the filter predicate parquet used
-   * @return translate the sarg into a filter predicate
-   */
-  public static FilterPredicate toFilterPredicate(SearchArgument sarg) {
-    return translate(sarg.getExpression(), sarg.getLeaves());
-  }
-
-  private static boolean isMultiLiteralsOperator(PredicateLeaf.Operator op) {
-    return (op == PredicateLeaf.Operator.IN) ||
-        (op == PredicateLeaf.Operator.BETWEEN);
-  }
-
-  private static FilterPredicate translate(ExpressionTree root,
-                                           List<PredicateLeaf> leafs){
-    FilterPredicate p = null;
-    switch (root.getOperator()) {
-      case OR:
-        for(ExpressionTree child: root.getChildren()) {
-          if (p == null) {
-            p = translate(child, leafs);
-          } else {
-            FilterPredicate right = translate(child, leafs);
-            // constant means no filter, ignore it when it is null
-            if(right != null){
-              p = FilterApi.or(p, right);
-            }
-          }
-        }
-        return p;
-      case AND:
-        for(ExpressionTree child: root.getChildren()) {
-          if (p == null) {
-            p = translate(child, leafs);
-          } else {
-            FilterPredicate right = translate(child, leafs);
-            // constant means no filter, ignore it when it is null
-            if(right != null){
-              p = FilterApi.and(p, right);
-            }
-          }
-        }
-        return p;
-      case NOT:
-        FilterPredicate op = translate(root.getChildren().get(0), leafs);
-        if (op != null) {
-          return FilterApi.not(op);
-        } else {
-          return null;
-        }
-      case LEAF:
-        return buildFilterPredicateFromPredicateLeaf(leafs.get(root.getLeaf()));
-      case CONSTANT:
-        return null;// no filter will be executed for constant
-      default:
-        throw new IllegalStateException("Unknown operator: " +
-            root.getOperator());
-    }
-  }
-
-  private static FilterPredicate buildFilterPredicateFromPredicateLeaf
-          (PredicateLeaf leaf) {
-    LeafFilterFactory leafFilterFactory = new LeafFilterFactory();
-    FilterPredicateLeafBuilder builder;
-    try {
-      builder = leafFilterFactory
-          .getLeafFilterBuilderByType(leaf.getType());
-      if (builder == null) {
-        return null;
-      }
-      if (isMultiLiteralsOperator(leaf.getOperator())) {
-        return builder.buildPredicate(leaf.getOperator(),
-            leaf.getLiteralList(),
-            leaf.getColumnName());
-      } else {
-        return builder
-            .buildPredict(leaf.getOperator(),
-                leaf.getLiteral(),
-                leaf.getColumnName());
-      }
-    } catch (Exception e) {
-      LOG.error("fail to build predicate filter leaf with errors" + e, e);
-      return null;
-    }
-  }
-
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java
index 87dd344..f9ca528 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestParquetRecordReaderWrapper.java
@@ -22,7 +22,7 @@ import static junit.framework.Assert.assertEquals;
 
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
-import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
+import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -56,7 +56,7 @@ public class TestParquetRecordReaderWrapper {
         .end()
         .build();
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected =
       "and(and(and(not(eq(x, null)), not(and(lt(y, 20), not(lteq(y, 10))))), not(or(or(eq(z, 1), " +
         "eq(z, 2)), eq(z, 3)))), not(eq(a, Binary{\"stinger\"})))";
@@ -76,7 +76,7 @@ public class TestParquetRecordReaderWrapper {
             .end()
             .build();
     assertEquals("lteq(y, Binary{\"hi        \"})",
-        ParquetRecordReaderWrapper.toFilterPredicate(sarg).toString());
+        ParquetFilterPredicateConverter.toFilterPredicate(sarg).toString());
 
     sarg = SearchArgumentFactory.newBuilder()
         .startNot()
@@ -91,7 +91,7 @@ public class TestParquetRecordReaderWrapper {
         .end()
         .build();
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected =
         "and(and(not(eq(x, null)), not(or(or(eq(z, 1), eq(z, 2)), eq(z, 3)))), " +
         "not(eq(a, Binary{\"stinger\"})))";
@@ -111,7 +111,7 @@ public class TestParquetRecordReaderWrapper {
             .end()
             .build();
     assertEquals("lteq(y, Binary{\"hi        \"})",
-        ParquetRecordReaderWrapper.toFilterPredicate(sarg).toString());
+        ParquetFilterPredicateConverter.toFilterPredicate(sarg).toString());
 
     sarg = SearchArgumentFactory.newBuilder()
         .startNot()
@@ -126,7 +126,7 @@ public class TestParquetRecordReaderWrapper {
         .end()
         .build();
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected = "and(and(not(eq(x, null)), not(or(or(eq(z, 1), eq(z, 2)), eq(z, 3)))), " +
         "not(eq(a, Binary{\"stinger\"})))";
     assertEquals(expected, p.toString());
@@ -146,7 +146,7 @@ public class TestParquetRecordReaderWrapper {
             .end()
             .build();
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected = "and(and(and(and(lt(x, 22), lt(x1, 22))," +
         " lteq(y, Binary{\"hi        \"})), eq(z, " +
         "0.22)), eq(z1, 0.22))";

http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestParquetFilterPredicate.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestParquetFilterPredicate.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestParquetFilterPredicate.java
new file mode 100644
index 0000000..847a02b
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/read/TestParquetFilterPredicate.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.parquet.read;
+
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+
+public class TestParquetFilterPredicate {
+  @Test
+  public void testFilterColumnsThatDoNoExistOnSchema() {
+    MessageType schema = MessageTypeParser.parseMessageType("message test { required int32 a; required binary stinger; }");
+    SearchArgument sarg = SearchArgumentFactory.newBuilder()
+        .startNot()
+        .startOr()
+        .isNull("a", PredicateLeaf.Type.INTEGER)
+        .between("y", PredicateLeaf.Type.INTEGER, 10, 20) // Column will be removed from filter
+        .in("z", PredicateLeaf.Type.INTEGER, 1, 2, 3) // Column will be removed from filter
+        .nullSafeEquals("a", PredicateLeaf.Type.STRING, "stinger")
+        .end()
+        .end()
+        .build();
+
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg, schema);
+
+    String expected = "and(not(eq(a, null)), not(eq(a, Binary{\"stinger\"})))";
+    assertEquals(expected, p.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
index 85e952f..9e8425a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestConvertAstToSearchArg.java
@@ -24,22 +24,15 @@ import static junit.framework.Assert.assertTrue;
 
 import com.google.common.collect.Sets;
 
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
+import org.apache.hadoop.hive.ql.io.parquet.read.ParquetFilterPredicateConverter;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl.PredicateLeafImpl;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.junit.Test;
 
 import java.beans.XMLDecoder;
 import java.io.ByteArrayInputStream;
 import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Field;
-import java.sql.Date;
-import java.sql.Timestamp;
 import java.util.List;
 import java.util.Set;
 
@@ -557,7 +550,7 @@ public class TestConvertAstToSearchArg {
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(9, leaves.size());
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String[] conditions = new String[]{
       "eq(first_name, Binary{\"john\"})",    /* first_name = 'john' */
       "not(lteq(first_name, Binary{\"greg\"}))", /* 'greg' < first_name */
@@ -849,7 +842,7 @@ public class TestConvertAstToSearchArg {
       "lteq(id, 4)"                         /* id <= 4             */
     };
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected = String.format("or(or(or(%1$s, %2$s), %3$s), %4$s)", conditions);
     assertEquals(expected, p.toString());
 
@@ -1279,7 +1272,7 @@ public class TestConvertAstToSearchArg {
       "eq(last_name, Binary{\"smith\"})"    /* 'smith' = last_name  */
     };
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected = String.format("and(and(and(%1$s, %2$s), %3$s), %4$s)", conditions);
     assertEquals(expected, p.toString());
 
@@ -1500,7 +1493,7 @@ public class TestConvertAstToSearchArg {
       "or(eq(id, 34), eq(id, 50))" /* id in (34,50) */
     };
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected = String.format("and(and(%1$s, %2$s), %3$s)", conditions);
     assertEquals(expected, p.toString());
 
@@ -1759,7 +1752,7 @@ public class TestConvertAstToSearchArg {
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(1, leaves.size());
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected =
       "and(lt(first_name, Binary{\"greg\"}), not(lteq(first_name, Binary{\"david\"})))";
     assertEquals(p.toString(), expected);
@@ -2239,7 +2232,7 @@ public class TestConvertAstToSearchArg {
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(9, leaves.size());
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected = "and(and(and(and(and(and(and(and(and(and(and(and(and(and(and(and(and(" +
       "or(or(or(lt(id, 18), lt(id, 10)), lt(id, 13)), lt(id, 16)), " +
       "or(or(or(lt(id, 18), lt(id, 11)), lt(id, 13)), lt(id, 16))), " +
@@ -2395,7 +2388,7 @@ public class TestConvertAstToSearchArg {
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(0, leaves.size());
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     assertNull(p);
 
     assertEquals("YES_NO_NULL",
@@ -2650,7 +2643,7 @@ public class TestConvertAstToSearchArg {
     List<PredicateLeaf> leaves = sarg.getLeaves();
     assertEquals(1, leaves.size());
 
-    FilterPredicate p = ParquetRecordReaderWrapper.toFilterPredicate(sarg);
+    FilterPredicate p = ParquetFilterPredicateConverter.toFilterPredicate(sarg);
     String expected = "and(not(lt(id, 10)), not(lt(id, 10)))";
     assertEquals(expected, p.toString());
 

http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
new file mode 100644
index 0000000..08af84f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/parquet_predicate_pushdown.q
@@ -0,0 +1,9 @@
+SET hive.optimize.index.filter=true;
+SET hive.optimize.ppd=true;
+
+-- Test predicate with partitioned columns
+CREATE TABLE part1 (id int, content string) PARTITIONED BY (p string) STORED AS PARQUET;
+ALTER TABLE part1 ADD PARTITION (p='p1');
+INSERT INTO TABLE part1 PARTITION (p='p1') VALUES (1, 'a'), (2, 'b');
+SELECT * FROM part1 WHERE p='p1';
+DROP TABLE part1 PURGE;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/724b3193/ql/src/test/results/clientpositive/parquet_predicate_pushdown.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/parquet_predicate_pushdown.q.out b/ql/src/test/results/clientpositive/parquet_predicate_pushdown.q.out
new file mode 100644
index 0000000..4186618
--- /dev/null
+++ b/ql/src/test/results/clientpositive/parquet_predicate_pushdown.q.out
@@ -0,0 +1,47 @@
+PREHOOK: query: -- Test predicate with partitioned columns
+CREATE TABLE part1 (id int, content string) PARTITIONED BY (p string) STORED AS PARQUET
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@part1
+POSTHOOK: query: -- Test predicate with partitioned columns
+CREATE TABLE part1 (id int, content string) PARTITIONED BY (p string) STORED AS PARQUET
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@part1
+PREHOOK: query: ALTER TABLE part1 ADD PARTITION (p='p1')
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@part1
+POSTHOOK: query: ALTER TABLE part1 ADD PARTITION (p='p1')
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@part1
+POSTHOOK: Output: default@part1@p=p1
+PREHOOK: query: INSERT INTO TABLE part1 PARTITION (p='p1') VALUES (1, 'a'), (2, 'b')
+PREHOOK: type: QUERY
+PREHOOK: Input: default@values__tmp__table__1
+PREHOOK: Output: default@part1@p=p1
+POSTHOOK: query: INSERT INTO TABLE part1 PARTITION (p='p1') VALUES (1, 'a'), (2, 'b')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@values__tmp__table__1
+POSTHOOK: Output: default@part1@p=p1
+POSTHOOK: Lineage: part1 PARTITION(p=p1).content SIMPLE [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col2, type:string, comment:), ]
+POSTHOOK: Lineage: part1 PARTITION(p=p1).id EXPRESSION [(values__tmp__table__1)values__tmp__table__1.FieldSchema(name:tmp_values_col1, type:string, comment:), ]
+PREHOOK: query: SELECT * FROM part1 WHERE p='p1'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@part1
+PREHOOK: Input: default@part1@p=p1
+#### A masked pattern was here ####
+POSTHOOK: query: SELECT * FROM part1 WHERE p='p1'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@part1
+POSTHOOK: Input: default@part1@p=p1
+#### A masked pattern was here ####
+1	a	p1
+2	b	p1
+PREHOOK: query: DROP TABLE part1 PURGE
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@part1
+PREHOOK: Output: default@part1
+POSTHOOK: query: DROP TABLE part1 PURGE
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@part1
+POSTHOOK: Output: default@part1