You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/04/13 23:43:11 UTC

spark git commit: [SQL][SPARK-6742]: Don't push down predicates which reference partition column(s)

Repository: spark
Updated Branches:
  refs/heads/master 85ee0cabe -> 3a205bbd9


[SQL][SPARK-6742]: Don't push down predicates which reference partition column(s)

cc liancheng

Author: Yash Datta <Ya...@guavus.com>

Closes #5390 from saucam/fpush and squashes the following commits:

3f026d6 [Yash Datta] SPARK-6742: Fix scalastyle
ce3d702 [Yash Datta] SPARK-6742: Add test case, fix scalastyle
8592acc [Yash Datta] SPARK-6742: Don't push down predicates which reference partition column(s)


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a205bbd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a205bbd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a205bbd

Branch: refs/heads/master
Commit: 3a205bbd9e352668a020c3146391e1e4441467af
Parents: 85ee0ca
Author: Yash Datta <Ya...@guavus.com>
Authored: Mon Apr 13 14:43:07 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Apr 13 14:43:07 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkStrategies.scala   | 11 ++++++++-
 .../spark/sql/parquet/ParquetFilterSuite.scala  | 24 +++++++++++++++++++-
 2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a205bbd/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5268b73..f0d92ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -215,6 +215,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
         InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
       case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
+        val partitionColNames = relation.partitioningAttributes.map(_.name).toSet
+        val filtersToPush = filters.filter { pred =>
+            val referencedColNames = pred.references.map(_.name).toSet
+            referencedColNames.intersect(partitionColNames).isEmpty
+          }
         val prunePushedDownFilters =
           if (sqlContext.conf.parquetFilterPushDown) {
             (predicates: Seq[Expression]) => {
@@ -226,6 +231,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
               // "A AND B" in the higher-level filter, not just "B".
               predicates.map(p => p -> ParquetFilters.createFilter(p)).collect {
                 case (predicate, None) => predicate
+                // Filter needs to be applied above when it contains partitioning
+                // columns
+                case (predicate, _) if(!predicate.references.map(_.name).toSet
+                  .intersect (partitionColNames).isEmpty) => predicate
               }
             }
           } else {
@@ -238,7 +247,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           ParquetTableScan(
             _,
             relation,
-            if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil
+            if (sqlContext.conf.parquetFilterPushDown) filtersToPush else Nil)) :: Nil
 
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/3a205bbd/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 6a2c2a7..10d0ede 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -22,7 +22,7 @@ import parquet.filter2.predicate.Operators._
 import parquet.filter2.predicate.{FilterPredicate, Operators}
 
 import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, Predicate, Row}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.sources.LogicalRelation
 import org.apache.spark.sql.test.TestSQLContext
@@ -350,4 +350,26 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
   override protected def afterAll(): Unit = {
     sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
   }
+  
+  test("SPARK-6742: don't push down predicates which reference partition columns") {
+    import sqlContext.implicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/part=1"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)
+
+        // If the "part = 1" filter gets pushed down, this query will throw an exception since
+        // "part" is not a valid column in the actual Parquet file
+        val df = DataFrame(sqlContext, org.apache.spark.sql.parquet.ParquetRelation(
+          path,
+          Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
+          Seq(AttributeReference("part", IntegerType, false)()) ))
+       
+        checkAnswer(
+          df.filter("a = 1 or part = 1"),
+          (1 to 3).map(i => Row(1, i, i.toString)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org