You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pa...@apache.org on 2016/11/05 00:11:35 UTC

[08/10] drill git commit: DRILL-1950: Parquet rowgroup level filter pushdown in query planning time.

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
index adada23..70d31b1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestUnionAll.java
@@ -1090,7 +1090,7 @@ public class TestUnionAll extends BaseTestQuery{
 
     final String query = String.format("SELECT o_custkey FROM \n" +
         " (select o1.o_custkey from dfs_test.`%s` o1 inner join dfs_test.`%s` o2 on o1.o_orderkey = o2.o_custkey) \n" +
-        " Union All SELECT o_custkey FROM dfs_test.`%s` where o_custkey < 10", l, r, l);
+        " Union All SELECT o_custkey FROM dfs_test.`%s` where o_custkey > 10", l, r, l);
 
     // Validate the plan
     final String[] expectedPlan = {"(?s)UnionExchange.*UnionAll.*HashJoin.*"};

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
index c13dc48..bfecf52 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
@@ -21,7 +21,13 @@ import com.codahale.metrics.MetricRegistry;
 import com.google.common.io.Files;
 import mockit.NonStrictExpectations;
 import org.apache.commons.io.FileUtils;
+import org.antlr.runtime.ANTLRStringStream;
+import org.antlr.runtime.CommonTokenStream;
+import org.antlr.runtime.RecognitionException;
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.parser.ExprLexer;
+import org.apache.drill.common.expression.parser.ExprParser;
 import org.apache.drill.common.scanner.ClassPathScanner;
 import org.apache.drill.exec.compile.CodeCompilerTestFactory;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
@@ -91,4 +97,12 @@ public class ExecTest extends DrillTest {
     }};
   }
 
+  protected LogicalExpression parseExpr(String expr) throws RecognitionException {
+    final ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
+    final CommonTokenStream tokens = new CommonTokenStream(lexer);
+    final ExprParser parser = new ExprParser(tokens);
+    final ExprParser.parse_return ret = parser.parse();
+    return ret.e;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
index a8e8814..af2ee46 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/ExpressionTest.java
@@ -120,14 +120,6 @@ public class ExpressionTest extends ExecTest {
 
   // HELPER METHODS //
 
-  private LogicalExpression parseExpr(String expr) throws RecognitionException {
-    final ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
-    final CommonTokenStream tokens = new CommonTokenStream(lexer);
-    final ExprParser parser = new ExprParser(tokens);
-    parse_return ret = parser.parse();
-    return ret.e;
-  }
-
   private String getExpressionCode(String expression, RecordBatch batch) throws Exception {
     final LogicalExpression expr = parseExpr(expression);
     final ErrorCollector error = new ErrorCollectorImpl();

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
index 4d2ad02..722d45e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
@@ -203,14 +203,6 @@ public class ExpressionInterpreterTest  extends PopUnitTestBase {
     }
   }
 
-  private LogicalExpression parseExpr(String expr) throws RecognitionException {
-    final ExprLexer lexer = new ExprLexer(new ANTLRStringStream(expr));
-    final CommonTokenStream tokens = new CommonTokenStream(lexer);
-    final ExprParser parser = new ExprParser(tokens);
-    final ExprParser.parse_return ret = parser.parse();
-    return ret.e;
-  }
-
   private ValueVector evalExprWithInterpreter(String expression, RecordBatch batch, Drillbit bit) throws Exception {
     final LogicalExpression expr = parseExpr(expression);
     final ErrorCollector error = new ErrorCollectorImpl();

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
index 6059a5b..fa28001 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestNestedLoopJoin.java
@@ -149,7 +149,7 @@ public class TestNestedLoopJoin extends PlanTestBase {
   public void testNlJoinEqualityNonScalar_2_planning() throws Exception {
     String query = String.format("select n.n_nationkey from cp.`tpch/nation.parquet` n, "
         + " dfs_test.`%s/multilevel/parquet` o "
-        + " where n.n_regionkey = o.o_orderkey and o.o_custkey < 5", TEST_RES_PATH);
+        + " where n.n_regionkey = o.o_orderkey and o.o_custkey > 5", TEST_RES_PATH);
     test("alter session set `planner.slice_target` = 1");
     test(DISABLE_HJ);
     test(DISABLE_MJ);

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
index b4a9e79..162b5bf 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java
@@ -265,7 +265,7 @@ public class TestFileGenerator {
       w.endBlock();
     }
     w.end(new HashMap<String, String>());
-    logger.debug("Finished generating parquet file.");
+    logger.debug("Finished generating parquet file {}", path.getName());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
new file mode 100644
index 0000000..1ad000e
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.zookeeper.ZooDefs.OpCode.create;
+import static org.junit.Assert.assertEquals;
+
+public class TestParquetFilterPushDown extends PlanTestBase {
+
+  private static final String WORKING_PATH = TestTools.getWorkingPath();
+  private static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
+  private static FragmentContext fragContext;
+
+  static FileSystem fs;
+
+  @BeforeClass
+  public static void initFSAndCreateFragContext() throws Exception {
+    fragContext = new FragmentContext(bits[0].getContext(),
+        BitControl.PlanFragment.getDefaultInstance(), null, bits[0].getContext().getFunctionImplementationRegistry());
+
+    Configuration conf = new Configuration();
+    conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "local");
+
+    fs = FileSystem.get(conf);
+  }
+
+  @AfterClass
+  public static void close() throws Exception {
+    fragContext.close();
+    fs.close();
+  }
+
+  @Test
+  // Test filter evaluation directly without go through SQL queries.
+  public void testIntPredicateWithEval() throws Exception {
+    // intTbl.parquet has only one int column
+    //    intCol : [0, 100].
+    final String filePath = String.format("%s/parquetFilterPush/intTbl/intTbl.parquet", TEST_RES_PATH);
+    ParquetMetadata footer = getParquetMetaData(filePath);
+
+    testParquetRowGroupFilterEval(footer, "intCol = 100", false);
+    testParquetRowGroupFilterEval(footer, "intCol = 0", false);
+    testParquetRowGroupFilterEval(footer, "intCol = 50", false);
+
+    testParquetRowGroupFilterEval(footer, "intCol = -1", true);
+    testParquetRowGroupFilterEval(footer, "intCol = 101", true);
+
+    testParquetRowGroupFilterEval(footer, "intCol > 100", true);
+    testParquetRowGroupFilterEval(footer, "intCol > 99", false);
+
+    testParquetRowGroupFilterEval(footer, "intCol >= 100", false);
+    testParquetRowGroupFilterEval(footer, "intCol >= 101", true);
+
+    testParquetRowGroupFilterEval(footer, "intCol < 100", false);
+    testParquetRowGroupFilterEval(footer, "intCol < 1", false);
+    testParquetRowGroupFilterEval(footer, "intCol < 0", true);
+
+    testParquetRowGroupFilterEval(footer, "intCol <= 100", false);
+    testParquetRowGroupFilterEval(footer, "intCol <= 1", false);
+    testParquetRowGroupFilterEval(footer, "intCol <= 0", false);
+    testParquetRowGroupFilterEval(footer, "intCol <= -1", true);
+
+    // "and"
+    testParquetRowGroupFilterEval(footer, "intCol > 100 and intCol  < 200", true);
+    testParquetRowGroupFilterEval(footer, "intCol > 50 and intCol < 200", false);
+    testParquetRowGroupFilterEval(footer, "intCol > 50 and intCol > 200", true); // essentially, intCol > 200
+
+    // "or"
+    testParquetRowGroupFilterEval(footer, "intCol = 150 or intCol = 160", true);
+    testParquetRowGroupFilterEval(footer, "intCol = 50 or intCol = 160", false);
+
+    //"nonExistCol" does not exist in the table. "AND" with a filter on exist column
+    testParquetRowGroupFilterEval(footer, "intCol > 100 and nonExistCol = 100", true);
+    testParquetRowGroupFilterEval(footer, "intCol > 50 and nonExistCol = 100", true); // since nonExistCol = 100 -> Unknown -> could drop.
+    testParquetRowGroupFilterEval(footer, "nonExistCol = 100 and intCol > 50", true); // since nonExistCol = 100 -> Unknown -> could drop.
+    testParquetRowGroupFilterEval(footer, "intCol > 100 and nonExistCol < 'abc'", true);
+    testParquetRowGroupFilterEval(footer, "nonExistCol < 'abc' and intCol > 100", true); // nonExistCol < 'abc' hit NumberException and is ignored, but intCol >100 will say "drop".
+    testParquetRowGroupFilterEval(footer, "intCol > 50 and nonExistCol < 'abc'", false); // because nonExistCol < 'abc' hit NumberException and is ignored.
+
+    //"nonExistCol" does not exist in the table. "OR" with a filter on exist column
+    testParquetRowGroupFilterEval(footer, "intCol > 100 or nonExistCol = 100", true); // nonExistCol = 100 -> could drop.
+    testParquetRowGroupFilterEval(footer, "nonExistCol = 100 or intCol > 100", true); // nonExistCol = 100 -> could drop.
+    testParquetRowGroupFilterEval(footer, "intCol > 50 or nonExistCol < 100", false);
+    testParquetRowGroupFilterEval(footer, "nonExistCol < 100 or intCol > 50", false);
+
+    // cast function on column side (LHS)
+    testParquetRowGroupFilterEval(footer, "cast(intCol as bigint) = 100", false);
+    testParquetRowGroupFilterEval(footer, "cast(intCol as bigint) = 0", false);
+    testParquetRowGroupFilterEval(footer, "cast(intCol as bigint) = 50", false);
+    testParquetRowGroupFilterEval(footer, "cast(intCol as bigint) = 101", true);
+    testParquetRowGroupFilterEval(footer, "cast(intCol as bigint) = -1", true);
+
+    // cast function on constant side (RHS)
+    testParquetRowGroupFilterEval(footer, "intCol = cast(100 as bigint)", false);
+    testParquetRowGroupFilterEval(footer, "intCol = cast(0 as bigint)", false);
+    testParquetRowGroupFilterEval(footer, "intCol = cast(50 as bigint)", false);
+    testParquetRowGroupFilterEval(footer, "intCol = cast(101 as bigint)", true);
+    testParquetRowGroupFilterEval(footer, "intCol = cast(-1 as bigint)", true);
+
+    // cast into float4/float8
+    testParquetRowGroupFilterEval(footer, "cast(intCol as float4) = cast(101.0 as float4)", true);
+    testParquetRowGroupFilterEval(footer, "cast(intCol as float4) = cast(-1.0 as float4)", true);
+    testParquetRowGroupFilterEval(footer, "cast(intCol as float4) = cast(1.0 as float4)", false);
+
+    testParquetRowGroupFilterEval(footer, "cast(intCol as float8) = 101.0", true);
+    testParquetRowGroupFilterEval(footer, "cast(intCol as float8) = -1.0", true);
+    testParquetRowGroupFilterEval(footer, "cast(intCol as float8) = 1.0", false);
+  }
+
+  @Test
+  public void testIntPredicateAgainstAllNullColWithEval() throws Exception {
+    // intAllNull.parquet has only one int column with all values being NULL.
+    // column values statistics: num_nulls: 25, min/max is not defined
+    final String filePath = String.format("%s/parquetFilterPush/intTbl/intAllNull.parquet", TEST_RES_PATH);
+    ParquetMetadata footer = getParquetMetaData(filePath);
+
+    testParquetRowGroupFilterEval(footer, "intCol = 100", true);
+    testParquetRowGroupFilterEval(footer, "intCol = 0", true);
+    testParquetRowGroupFilterEval(footer, "intCol = -100", true);
+
+    testParquetRowGroupFilterEval(footer, "intCol > 10", true);
+    testParquetRowGroupFilterEval(footer, "intCol >= 10", true);
+
+    testParquetRowGroupFilterEval(footer, "intCol < 10", true);
+    testParquetRowGroupFilterEval(footer, "intCol <= 10", true);
+  }
+
+  @Test
+  public void testDatePredicateAgainstDrillCTAS1_8WithEval() throws Exception {
+    // The parquet file is created on drill 1.8.0 with DRILL CTAS:
+    //   create table dfs.tmp.`dateTblCorrupted/t1` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and date '1992-01-03';
+
+    final String filePath = String.format("%s/parquetFilterPush/dateTblCorrupted/t1/0_0_0.parquet", TEST_RES_PATH);
+    ParquetMetadata footer = getParquetMetaData(filePath);
+
+    testDatePredicateAgainstDrillCTASHelper(footer);
+  }
+
+  @Test
+  public void testDatePredicateAgainstDrillCTASPost1_8WithEval() throws Exception {
+    // The parquet file is created on drill 1.9.0-SNAPSHOT (commit id:03e8f9f3e01c56a9411bb4333e4851c92db6e410) with DRILL CTAS:
+    //   create table dfs.tmp.`dateTbl1_9/t1` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and date '1992-01-03';
+
+    final String filePath = String.format("%s/parquetFilterPush/dateTbl1_9/t1/0_0_0.parquet", TEST_RES_PATH);
+    ParquetMetadata footer = getParquetMetaData(filePath);
+
+    testDatePredicateAgainstDrillCTASHelper(footer);
+  }
+
+
+  private void testDatePredicateAgainstDrillCTASHelper(ParquetMetadata footer) throws Exception{
+    testParquetRowGroupFilterEval(footer, "o_orderdate = cast('1992-01-01' as date)", false);
+    testParquetRowGroupFilterEval(footer, "o_orderdate = cast('1991-12-31' as date)", true);
+
+    testParquetRowGroupFilterEval(footer, "o_orderdate >= cast('1991-12-31' as date)", false);
+    testParquetRowGroupFilterEval(footer, "o_orderdate >= cast('1992-01-03' as date)", false);
+    testParquetRowGroupFilterEval(footer, "o_orderdate >= cast('1992-01-04' as date)", true);
+
+    testParquetRowGroupFilterEval(footer, "o_orderdate > cast('1992-01-01' as date)", false);
+    testParquetRowGroupFilterEval(footer, "o_orderdate > cast('1992-01-03' as date)", true);
+
+    testParquetRowGroupFilterEval(footer, "o_orderdate <= cast('1992-01-01' as date)", false);
+    testParquetRowGroupFilterEval(footer, "o_orderdate <= cast('1991-12-31' as date)", true);
+
+    testParquetRowGroupFilterEval(footer, "o_orderdate < cast('1992-01-02' as date)", false);
+    testParquetRowGroupFilterEval(footer, "o_orderdate < cast('1992-01-01' as date)", true);
+  }
+
+  @Test
+  public void testTimeStampPredicateWithEval() throws Exception {
+    // Table dateTblCorrupted is created by CTAS in drill 1.8.0.
+    //    create table dfs.tmp.`tsTbl/t1` as select DATE_ADD(cast(o_orderdate as date), INTERVAL '0 10:20:30' DAY TO SECOND) as o_ordertimestamp from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and date '1992-01-03';
+    final String filePath = String.format("%s/parquetFilterPush/tsTbl/t1/0_0_0.parquet", TEST_RES_PATH);
+    ParquetMetadata footer = getParquetMetaData(filePath);
+
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp = cast('1992-01-01 10:20:30' as timestamp)", false);
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp = cast('1992-01-01 10:20:29' as timestamp)", true);
+
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp >= cast('1992-01-01 10:20:29' as timestamp)", false);
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp >= cast('1992-01-03 10:20:30' as timestamp)", false);
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp >= cast('1992-01-03 10:20:31' as timestamp)", true);
+
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp > cast('1992-01-03 10:20:29' as timestamp)", false);
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp > cast('1992-01-03 10:20:30' as timestamp)", true);
+
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp <= cast('1992-01-01 10:20:30' as timestamp)", false);
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp <= cast('1992-01-01 10:20:29' as timestamp)", true);
+
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp < cast('1992-01-01 10:20:31' as timestamp)", false);
+    testParquetRowGroupFilterEval(footer, "o_ordertimestamp < cast('1992-01-01 10:20:30' as timestamp)", true);
+
+  }
+
+  @Test
+  // Test against parquet files from Drill CTAS post 1.8.0 release.
+  public void testDatePredicateAgaistDrillCTASPost1_8() throws  Exception {
+    String tableName = "order_ctas";
+
+    try {
+      deleteTableIfExists(tableName);
+
+      test("use dfs_test.tmp");
+      test(String.format("create table `%s/t1` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and date '1992-01-03'", tableName));
+      test(String.format("create table `%s/t2` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-04' and date '1992-01-06'", tableName));
+      test(String.format("create table `%s/t3` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-07' and date '1992-01-09'", tableName));
+
+      final String query1 = "select o_orderdate from dfs_test.tmp.order_ctas where o_orderdate = date '1992-01-01'";
+      testParquetFilterPD(query1, 9, 1, false);
+
+      final String query2 = "select o_orderdate from dfs_test.tmp.order_ctas where o_orderdate < date '1992-01-01'";
+      testParquetFilterPD(query2, 0, 1, false);
+
+      final String query3 = "select o_orderdate from dfs_test.tmp.order_ctas where o_orderdate between date '1992-01-01' and date '1992-01-03'";
+      testParquetFilterPD(query3, 22, 1, false);
+
+      final String query4 = "select o_orderdate from dfs_test.tmp.order_ctas where o_orderdate between date '1992-01-01' and date '1992-01-04'";
+      testParquetFilterPD(query4, 33, 2, false);
+
+      final String query5 = "select o_orderdate from dfs_test.tmp.order_ctas where o_orderdate between date '1992-01-01' and date '1992-01-06'";
+      testParquetFilterPD(query5, 49, 2, false);
+
+      final String query6 = "select o_orderdate from dfs_test.tmp.order_ctas where o_orderdate > date '1992-01-10'";
+      testParquetFilterPD(query6, 0, 1, false);
+
+      // Test parquet files with metadata cache files available.
+      // Now, create parquet metadata cache files, and run the above queries again. Flag "usedMetadataFile" should be true.
+      test(String.format("refresh table metadata %s", tableName));
+
+      testParquetFilterPD(query1, 9, 1, true);
+
+      testParquetFilterPD(query2, 0, 1, true);
+
+      testParquetFilterPD(query3, 22, 1, true);
+
+      testParquetFilterPD(query4, 33, 2, true);
+
+      testParquetFilterPD(query5, 49, 2, true);
+
+      testParquetFilterPD(query6, 0, 1, true);
+    } finally {
+      deleteTableIfExists(tableName);
+    }
+  }
+
+  @Test
+  public void testParquetFilterPDOptionsDisabled() throws Exception {
+    final String tableName = "order_ctas";
+
+    try {
+      deleteTableIfExists(tableName);
+
+      test("alter session set `" + PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY  + "` = false");
+
+      test("use dfs_test.tmp");
+      test(String.format("create table `%s/t1` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and date '1992-01-03'", tableName));
+      test(String.format("create table `%s/t2` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-04' and date '1992-01-06'", tableName));
+      test(String.format("create table `%s/t3` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-07' and date '1992-01-09'", tableName));
+
+      final String query1 = "select o_orderdate from dfs_test.tmp.order_ctas where o_orderdate = date '1992-01-01'";
+      testParquetFilterPD(query1, 9, 3, false);
+
+    } finally {
+      test("alter session set `" + PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY  + "` = " + PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING.getDefault().bool_val);
+      deleteTableIfExists(tableName);
+    }
+  }
+
+  @Test
+  public void testParquetFilterPDOptionsThreshold() throws Exception {
+    final String tableName = "order_ctas";
+
+    try {
+      deleteTableIfExists(tableName);
+
+      test("alter session set `" + PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY  + "` = 2 ");
+
+      test("use dfs_test.tmp");
+      test(String.format("create table `%s/t1` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and date '1992-01-03'", tableName));
+      test(String.format("create table `%s/t2` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-04' and date '1992-01-06'", tableName));
+      test(String.format("create table `%s/t3` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-07' and date '1992-01-09'", tableName));
+
+      final String query1 = "select o_orderdate from dfs_test.tmp.order_ctas where o_orderdate = date '1992-01-01'";
+      testParquetFilterPD(query1, 9, 3, false);
+
+    } finally {
+      test("alter session set `" + PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD_KEY  + "` = " + PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.getDefault().num_val);
+      deleteTableIfExists(tableName);
+    }
+  }
+
+  @Test
+  public void testDatePredicateAgainstCorruptedDateCol() throws Exception {
+    // Table dateTblCorrupted is created by CTAS in drill 1.8.0. Per DRILL-4203, the date column is shifted by some value.
+    // The CTAS are the following, then copy to drill test resource directory.
+    //    create table dfs.tmp.`dateTblCorrupted/t1` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and date '1992-01-03';
+    //    create table dfs.tmp.`dateTblCorrupted/t2` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-04' and date '1992-01-06';
+    //    create table dfs.tmp.`dateTblCorrupted/t3` as select cast(o_orderdate as date) as o_orderdate from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-07' and date '1992-01-09';
+
+    final String query1 = String.format("select o_orderdate from dfs_test.`%s/parquetFilterPush/dateTblCorrupted` where o_orderdate = date '1992-01-01'", TEST_RES_PATH);
+    testParquetFilterPD(query1, 9, 1, false);
+
+    final String query2 = String.format("select o_orderdate from dfs_test.`%s/parquetFilterPush/dateTblCorrupted` where o_orderdate < date '1992-01-01'", TEST_RES_PATH);
+    testParquetFilterPD(query2, 0, 1, false);
+
+    final String query3 = String.format("select o_orderdate from dfs_test.`%s/parquetFilterPush/dateTblCorrupted` where o_orderdate between date '1992-01-01' and date '1992-01-03'", TEST_RES_PATH);
+    testParquetFilterPD(query3, 22, 1, false);
+
+    final String query4 = String.format("select o_orderdate from dfs_test.`%s/parquetFilterPush/dateTblCorrupted` where o_orderdate between date '1992-01-01' and date '1992-01-04'", TEST_RES_PATH);
+    testParquetFilterPD(query4, 33, 2, false);
+
+    final String query5 = String.format("select o_orderdate from dfs_test.`%s/parquetFilterPush/dateTblCorrupted` where o_orderdate between date '1992-01-01' and date '1992-01-06'", TEST_RES_PATH);
+    testParquetFilterPD(query5, 49, 2, false);
+
+    final String query6 = String.format("select o_orderdate from dfs_test.`%s/parquetFilterPush/dateTblCorrupted` where o_orderdate > date '1992-01-10'", TEST_RES_PATH);
+
+    testParquetFilterPD(query6, 0, 1, false);
+  }
+
+  @Test
+  public void testTimeStampPredicate() throws Exception {
+    // Table dateTblCorrupted is created by CTAS in drill 1.8.0.
+    //    create table dfs.tmp.`tsTbl/t1` as select DATE_ADD(cast(o_orderdate as date), INTERVAL '0 10:20:30' DAY TO SECOND) as o_ordertimestamp from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-01' and date '1992-01-03';
+    //    create table dfs.tmp.`tsTbl/t2` as select DATE_ADD(cast(o_orderdate as date), INTERVAL '0 10:20:30' DAY TO SECOND) as o_ordertimestamp from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-04' and date '1992-01-06';
+    //    create table dfs.tmp.`tsTbl/t3` as select DATE_ADD(cast(o_orderdate as date), INTERVAL '0 10:20:30' DAY TO SECOND) as o_ordertimestamp from cp.`tpch/orders.parquet` where o_orderdate between date '1992-01-07' and date '1992-01-09';
+
+    final String query1 = String.format("select o_ordertimestamp from dfs_test.`%s/parquetFilterPush/tsTbl` where o_ordertimestamp = timestamp '1992-01-01 10:20:30'", TEST_RES_PATH);
+    testParquetFilterPD(query1, 9, 1, false);
+
+    final String query2 = String.format("select o_ordertimestamp from dfs_test.`%s/parquetFilterPush/tsTbl` where o_ordertimestamp < timestamp '1992-01-01 10:20:30'", TEST_RES_PATH);
+    testParquetFilterPD(query2, 0, 1, false);
+
+    final String query3 = String.format("select o_ordertimestamp from dfs_test.`%s/parquetFilterPush/tsTbl` where o_ordertimestamp between timestamp '1992-01-01 00:00:00' and timestamp '1992-01-06 10:20:30'", TEST_RES_PATH);
+    testParquetFilterPD(query3, 49, 2, false);
+  }
+
+
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+  // Some test helper functions.
+  //////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private void testParquetFilterPD(final String query, int expectedRowCount, int expectedNumFiles, boolean usedMetadataFile) throws Exception{
+    int actualRowCount = testSql(query);
+    assertEquals(expectedRowCount, actualRowCount);
+    String numFilesPattern = "numFiles=" + expectedNumFiles;
+    String usedMetaPattern = "usedMetadataFile=" + usedMetadataFile;
+
+    testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern}, new String[] {});
+  }
+
+  private void testParquetRowGroupFilterEval(final ParquetMetadata footer, final String exprStr,
+      boolean canDropExpected) throws Exception{
+    final LogicalExpression filterExpr = parseExpr(exprStr);
+    testParquetRowGroupFilterEval(footer, 0, filterExpr, canDropExpected);
+  }
+
+  private void testParquetRowGroupFilterEval(final ParquetMetadata footer, final int rowGroupIndex,
+      final LogicalExpression filterExpr, boolean canDropExpected) throws Exception {
+    boolean canDrop = ParquetRGFilterEvaluator.evalFilter(filterExpr, footer, rowGroupIndex,
+        fragContext.getOptions(), fragContext);
+    Assert.assertEquals(canDropExpected, canDrop);
+  }
+
+  private ParquetMetadata getParquetMetaData(String filePathStr) throws IOException{
+    Configuration fsConf = new Configuration();
+    ParquetMetadata footer = ParquetFileReader.readFooter(fsConf, new Path(filePathStr));
+    return footer;
+  }
+
+  private static void deleteTableIfExists(String tableName) {
+    try {
+      Path path = new Path(getDfsTestTmpSchemaLocation(), tableName);
+      if (fs.exists(path)) {
+        fs.delete(path, true);
+      }
+    } catch (Exception e) {
+      // ignore exceptions.
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/resources/parquetFilterPush/dateTbl1_9/t1/0_0_0.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquetFilterPush/dateTbl1_9/t1/0_0_0.parquet b/exec/java-exec/src/test/resources/parquetFilterPush/dateTbl1_9/t1/0_0_0.parquet
new file mode 100644
index 0000000..bd4f8e7
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquetFilterPush/dateTbl1_9/t1/0_0_0.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t1/0_0_0.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t1/0_0_0.parquet b/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t1/0_0_0.parquet
new file mode 100644
index 0000000..0ff9bdd
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t1/0_0_0.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t2/0_0_0.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t2/0_0_0.parquet b/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t2/0_0_0.parquet
new file mode 100644
index 0000000..cf28b54
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t2/0_0_0.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t3/0_0_0.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t3/0_0_0.parquet b/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t3/0_0_0.parquet
new file mode 100644
index 0000000..5fd9853
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquetFilterPush/dateTblCorrupted/t3/0_0_0.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/resources/parquetFilterPush/intTbl/intAllNull.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquetFilterPush/intTbl/intAllNull.parquet b/exec/java-exec/src/test/resources/parquetFilterPush/intTbl/intAllNull.parquet
new file mode 100644
index 0000000..06eb81d
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquetFilterPush/intTbl/intAllNull.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/resources/parquetFilterPush/intTbl/intTbl.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquetFilterPush/intTbl/intTbl.parquet b/exec/java-exec/src/test/resources/parquetFilterPush/intTbl/intTbl.parquet
new file mode 100644
index 0000000..9943078
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquetFilterPush/intTbl/intTbl.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t1/0_0_0.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t1/0_0_0.parquet b/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t1/0_0_0.parquet
new file mode 100644
index 0000000..f0498c6
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t1/0_0_0.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t2/0_0_0.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t2/0_0_0.parquet b/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t2/0_0_0.parquet
new file mode 100644
index 0000000..4da4e6b
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t2/0_0_0.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t3/0_0_0.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t3/0_0_0.parquet b/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t3/0_0_0.parquet
new file mode 100644
index 0000000..ee0c92c
Binary files /dev/null and b/exec/java-exec/src/test/resources/parquetFilterPush/tsTbl/t3/0_0_0.parquet differ

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/logical/src/main/java/org/apache/drill/common/expression/ValueExpressions.java
----------------------------------------------------------------------
diff --git a/logical/src/main/java/org/apache/drill/common/expression/ValueExpressions.java b/logical/src/main/java/org/apache/drill/common/expression/ValueExpressions.java
index 2fd8e67..662258d 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/ValueExpressions.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/ValueExpressions.java
@@ -59,6 +59,10 @@ public class ValueExpressions {
     return new org.apache.drill.common.expression.ValueExpressions.DateExpression(date.getTimeInMillis());
   }
 
+  public static LogicalExpression getDate(long milliSecond){
+    return new org.apache.drill.common.expression.ValueExpressions.DateExpression(milliSecond);
+  }
+
   public static LogicalExpression getTime(GregorianCalendar time) {
       int millis = time.get(GregorianCalendar.HOUR_OF_DAY) * 60 * 60 * 1000 +
                    time.get(GregorianCalendar.MINUTE) * 60 * 1000 +
@@ -68,9 +72,18 @@ public class ValueExpressions {
       return new TimeExpression(millis);
   }
 
+  public static LogicalExpression getTime(int milliSeconds) {
+    return new TimeExpression(milliSeconds);
+  }
+
   public static LogicalExpression getTimeStamp(GregorianCalendar date) {
     return new org.apache.drill.common.expression.ValueExpressions.TimeStampExpression(date.getTimeInMillis());
   }
+
+  public static LogicalExpression getTimeStamp(long milliSeconds) {
+    return new org.apache.drill.common.expression.ValueExpressions.TimeStampExpression(milliSeconds);
+  }
+
   public static LogicalExpression getIntervalYear(int months) {
     return new IntervalYearExpression(months);
   }
@@ -140,6 +153,8 @@ public class ValueExpressions {
 
   public static class BooleanExpression extends ValueExpression<Boolean> {
 
+    public static final BooleanExpression TRUE = new BooleanExpression("true", ExpressionPosition.UNKNOWN);
+    public static final BooleanExpression FALSE = new BooleanExpression("false", ExpressionPosition.UNKNOWN);
 
     public BooleanExpression(String value, ExpressionPosition pos) {
       super(value, pos);

http://git-wip-us.apache.org/repos/asf/drill/blob/9411b26e/logical/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java
----------------------------------------------------------------------
diff --git a/logical/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java b/logical/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java
index af25dd7..43b3b2e 100644
--- a/logical/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java
+++ b/logical/src/main/java/org/apache/drill/common/expression/fn/CastFunctions.java
@@ -177,6 +177,15 @@ public class CastFunctions {
         CAST_FUNC_REPLACEMENT_NEEDED.contains(originalfunction);
   }
 
+  /**
+   * Check if a funcName is one of the cast function.
+   * @param funcName
+   * @return
+   */
+  public static boolean isCastFunction(String funcName) {
+    return TYPE2FUNC.values().contains(funcName);
+  }
+
   private static String getReplacingCastFunctionFromNonNullable(String originalCastFunction, MinorType inputType) {
     if(inputType == MinorType.VARCHAR && CAST_FUNC_REPLACEMENT_FROM_NONNULLABLE_VARCHAR.containsKey(originalCastFunction)) {
       return CAST_FUNC_REPLACEMENT_FROM_NONNULLABLE_VARCHAR.get(originalCastFunction);