You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/10/15 01:29:36 UTC

spark git commit: [SPARK-10829] [SQL] Filter combine partition key and attribute doesn't work in DataSource scan

Repository: spark
Updated Branches:
  refs/heads/master 2b5e31c7e -> 1baaf2b9b


[SPARK-10829] [SQL] Filter combine partition key and attribute doesn't work in DataSource scan

```scala
withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
      withTempPath { dir =>
        val path = s"${dir.getCanonicalPath}/part=1"
        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)

        // If the "part = 1" filter gets pushed down, this query will throw an exception since
        // "part" is not a valid column in the actual Parquet file
        checkAnswer(
          sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
          (2 to 3).map(i => Row(i, i.toString, 1)))
      }
    }
```

We expect the result to be:
```
2,1
3,1
```
But got
```
1,1
2,1
3,1
```

Author: Cheng Hao <ha...@intel.com>

Closes #8916 from chenghao-intel/partition_filter.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1baaf2b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1baaf2b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1baaf2b9

Branch: refs/heads/master
Commit: 1baaf2b9bd7c949a8f95cd14fc1be2a56e1139b3
Parents: 2b5e31c
Author: Cheng Hao <ha...@intel.com>
Authored: Wed Oct 14 16:29:32 2015 -0700
Committer: Cheng Lian <li...@databricks.com>
Committed: Wed Oct 14 16:29:32 2015 -0700

----------------------------------------------------------------------
 .../datasources/DataSourceStrategy.scala        | 34 +++++++++++++-------
 .../parquet/ParquetFilterSuite.scala            | 17 ++++++++++
 2 files changed, 39 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1baaf2b9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 918db8e..33181fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -62,7 +62,22 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
     // Scanning partitioned HadoopFsRelation
     case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
         if t.partitionSpec.partitionColumns.nonEmpty =>
-      val selectedPartitions = prunePartitions(filters, t.partitionSpec).toArray
+      // We divide the filter expressions into 3 parts
+      val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
+
+      // TODO this is case-sensitive
+      // Only prunning the partition keys
+      val partitionFilters =
+        filters.filter(_.references.map(_.name).toSet.subsetOf(partitionColumnNames))
+
+      // Only pushes down predicates that do not reference partition keys.
+      val pushedFilters =
+        filters.filter(_.references.map(_.name).toSet.intersect(partitionColumnNames).isEmpty)
+
+      // Predicates with both partition keys and attributes
+      val combineFilters = filters.toSet -- partitionFilters.toSet -- pushedFilters.toSet
+
+      val selectedPartitions = prunePartitions(partitionFilters, t.partitionSpec).toArray
 
       logInfo {
         val total = t.partitionSpec.partitions.length
@@ -71,21 +86,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         s"Selected $selected partitions out of $total, pruned $percentPruned% partitions."
       }
 
-      // Only pushes down predicates that do not reference partition columns.
-      val pushedFilters = {
-        val partitionColumnNames = t.partitionSpec.partitionColumns.map(_.name).toSet
-        filters.filter { f =>
-          val referencedColumnNames = f.references.map(_.name).toSet
-          referencedColumnNames.intersect(partitionColumnNames).isEmpty
-        }
-      }
-
-      buildPartitionedTableScan(
+      val scan = buildPartitionedTableScan(
         l,
         projects,
         pushedFilters,
         t.partitionSpec.partitionColumns,
-        selectedPartitions) :: Nil
+        selectedPartitions)
+
+      combineFilters
+        .reduceLeftOption(expressions.And)
+        .map(execution.Filter(_, scan)).getOrElse(scan) :: Nil
 
     // Scanning non-partitioned HadoopFsRelation
     case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/1baaf2b9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 45ad3fd..7a23f57 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -297,4 +297,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
       }
     }
   }
+
+  test("SPARK-10829: Filter combine partition key and attribute doesn't work in DataSource scan") {
+    import testImplicits._
+
+    withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/part=1"
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
+
+        // If the "part = 1" filter gets pushed down, this query will throw an exception since
+        // "part" is not a valid column in the actual Parquet file
+        checkAnswer(
+          sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
+          (2 to 3).map(i => Row(i, i.toString, 1)))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org