You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/12/09 08:15:42 UTC

spark git commit: [SPARK-11676][SQL] Parquet filter tests all pass if filters are not really pushed down

Repository: spark
Updated Branches:
  refs/heads/master 3934562d3 -> f6883bb7a


[SPARK-11676][SQL] Parquet filter tests all pass if filters are not really pushed down

Currently Parquet predicate tests all pass even if filters are not pushed down or this is disabled.

In this PR, For checking evaluating filters, Simply it makes the expression from `expression.Filter` and then try to create filters just like Spark does.

For checking the results, this manually accesses to the child rdd (of `expression.Filter`) and produces the results which should be filtered properly, and then compares it to expected values.

Now, if filters are not pushed down or this is disabled, this throws exceptions.

Author: hyukjinkwon <gu...@gmail.com>

Closes #9659 from HyukjinKwon/SPARK-11676.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6883bb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6883bb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6883bb7

Branch: refs/heads/master
Commit: f6883bb7afa7d5df480e1c2b3db6cb77198550be
Parents: 3934562
Author: hyukjinkwon <gu...@gmail.com>
Authored: Wed Dec 9 15:15:30 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Wed Dec 9 15:15:30 2015 +0800

----------------------------------------------------------------------
 .../parquet/ParquetFilterSuite.scala            | 69 ++++++++++++--------
 1 file changed, 41 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6883bb7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index cc5aae0..daf41bc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -50,27 +50,33 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     val output = predicate.collect { case a: Attribute => a }.distinct
 
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
-      val query = df
-        .select(output.map(e => Column(e)): _*)
-        .where(Column(predicate))
-
-      val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
-        case PhysicalOperation(_, filters, LogicalRelation(_: ParquetRelation, _)) => filters
-      }.flatten.reduceLeftOption(_ && _)
-      assert(maybeAnalyzedPredicate.isDefined)
-
-      val selectedFilters = maybeAnalyzedPredicate.flatMap(DataSourceStrategy.translateFilter)
-      assert(selectedFilters.nonEmpty)
-
-      selectedFilters.foreach { pred =>
-        val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
-        assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
-        maybeFilter.foreach { f =>
-          // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
-          assert(f.getClass === filterClass)
+      withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
+        val query = df
+          .select(output.map(e => Column(e)): _*)
+          .where(Column(predicate))
+
+        var maybeRelation: Option[ParquetRelation] = None
+        val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
+          case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _)) =>
+            maybeRelation = Some(relation)
+            filters
+        }.flatten.reduceLeftOption(_ && _)
+        assert(maybeAnalyzedPredicate.isDefined, "No filter is analyzed from the given query")
+
+        val (_, selectedFilters) =
+          DataSourceStrategy.selectFilters(maybeRelation.get, maybeAnalyzedPredicate.toSeq)
+        assert(selectedFilters.nonEmpty, "No filter is pushed down")
+
+        selectedFilters.foreach { pred =>
+          val maybeFilter = ParquetFilters.createFilter(df.schema, pred)
+          assert(maybeFilter.isDefined, s"Couldn't generate filter predicate for $pred")
+          maybeFilter.foreach { f =>
+            // Doesn't bother checking type parameters here (e.g. `Eq[Integer]`)
+            assert(f.getClass === filterClass)
+          }
         }
+        checker(stripSparkFilter(query), expected)
       }
-      checker(query, expected)
     }
   }
 
@@ -104,6 +110,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
     checkBinaryFilterPredicate(predicate, filterClass, Seq(Row(expected)))(df)
   }
 
+  /**
+   * Strip Spark-side filtering in order to check if a datasource filters rows correctly.
+   */
+  protected def stripSparkFilter(df: DataFrame): DataFrame = {
+    val schema = df.schema
+    val childRDD = df
+      .queryExecution
+      .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
+      .child
+      .execute()
+      .map(row => Row.fromSeq(row.toSeq(schema)))
+
+    sqlContext.createDataFrame(childRDD, schema)
+  }
+
   test("filter pushdown - boolean") {
     withParquetDataFrame((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { implicit df =>
       checkFilterPredicate('_1.isNull, classOf[Eq[_]], Seq.empty[Row])
@@ -347,19 +368,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
           (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
           val df = sqlContext.read.parquet(path).filter("a = 2")
 
-          // This is the source RDD without Spark-side filtering.
-          val childRDD =
-            df
-              .queryExecution
-              .executedPlan.asInstanceOf[org.apache.spark.sql.execution.Filter]
-              .child
-              .execute()
-
           // The result should be single row.
           // When a filter is pushed to Parquet, Parquet can apply it to every row.
           // So, we can check the number of rows returned from the Parquet
           // to make sure our filter pushdown work.
-          assert(childRDD.count == 1)
+          assert(stripSparkFilter(df).count == 1)
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org