You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/04/13 23:43:11 UTC
spark git commit: [SQL][SPARK-6742]: Don't push down predicates which
reference partition column(s)
Repository: spark
Updated Branches:
refs/heads/master 85ee0cabe -> 3a205bbd9
[SQL][SPARK-6742]: Don't push down predicates which reference partition column(s)
cc liancheng
Author: Yash Datta <Ya...@guavus.com>
Closes #5390 from saucam/fpush and squashes the following commits:
3f026d6 [Yash Datta] SPARK-6742: Fix scalastyle
ce3d702 [Yash Datta] SPARK-6742: Add test case, fix scalastyle
8592acc [Yash Datta] SPARK-6742: Don't push down predicates which reference partition column(s)
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a205bbd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a205bbd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a205bbd
Branch: refs/heads/master
Commit: 3a205bbd9e352668a020c3146391e1e4441467af
Parents: 85ee0ca
Author: Yash Datta <Ya...@guavus.com>
Authored: Mon Apr 13 14:43:07 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Apr 13 14:43:07 2015 -0700
----------------------------------------------------------------------
.../spark/sql/execution/SparkStrategies.scala | 11 ++++++++-
.../spark/sql/parquet/ParquetFilterSuite.scala | 24 +++++++++++++++++++-
2 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3a205bbd/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5268b73..f0d92ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -215,6 +215,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
table: ParquetRelation, partition, child, overwrite, ifNotExists) =>
InsertIntoParquetTable(table, planLater(child), overwrite) :: Nil
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) =>
+ val partitionColNames = relation.partitioningAttributes.map(_.name).toSet
+ val filtersToPush = filters.filter { pred =>
+ val referencedColNames = pred.references.map(_.name).toSet
+ referencedColNames.intersect(partitionColNames).isEmpty
+ }
val prunePushedDownFilters =
if (sqlContext.conf.parquetFilterPushDown) {
(predicates: Seq[Expression]) => {
@@ -226,6 +231,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// "A AND B" in the higher-level filter, not just "B".
predicates.map(p => p -> ParquetFilters.createFilter(p)).collect {
case (predicate, None) => predicate
+ // Filter needs to be applied above when it contains partitioning
+ // columns
+ case (predicate, _) if(!predicate.references.map(_.name).toSet
+ .intersect (partitionColNames).isEmpty) => predicate
}
}
} else {
@@ -238,7 +247,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
ParquetTableScan(
_,
relation,
- if (sqlContext.conf.parquetFilterPushDown) filters else Nil)) :: Nil
+ if (sqlContext.conf.parquetFilterPushDown) filtersToPush else Nil)) :: Nil
case _ => Nil
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3a205bbd/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 6a2c2a7..10d0ede 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -22,7 +22,7 @@ import parquet.filter2.predicate.Operators._
import parquet.filter2.predicate.{FilterPredicate, Operators}
import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, Predicate, Row}
+import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.test.TestSQLContext
@@ -350,4 +350,26 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
override protected def afterAll(): Unit = {
sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}
+
+ test("SPARK-6742: don't push down predicates which reference partition columns") {
+ import sqlContext.implicits._
+
+ withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
+ withTempPath { dir =>
+ val path = s"${dir.getCanonicalPath}/part=1"
+ (1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)
+
+ // If the "part = 1" filter gets pushed down, this query will throw an exception since
+ // "part" is not a valid column in the actual Parquet file
+ val df = DataFrame(sqlContext, org.apache.spark.sql.parquet.ParquetRelation(
+ path,
+ Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext,
+ Seq(AttributeReference("part", IntegerType, false)()) ))
+
+ checkAnswer(
+ df.filter("a = 1 or part = 1"),
+ (1 to 3).map(i => Row(1, i, i.toString)))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org