You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/08/02 02:46:24 UTC
spark git commit: [SPARK-16818] Exchange reuse incorrectly reuses
scans over different sets of partitions
Repository: spark
Updated Branches:
refs/heads/branch-2.0 1813bbd9b -> 5fbf5f93e
[SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions
https://github.com/apache/spark/pull/14425 rebased for branch-2.0
Author: Eric Liang <ek...@databricks.com>
Closes #14427 from ericl/spark-16818-br-2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fbf5f93
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fbf5f93
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fbf5f93
Branch: refs/heads/branch-2.0
Commit: 5fbf5f93ee5aa4d1aca0fa0c8fb769a085dd7b93
Parents: 1813bbd
Author: Eric Liang <ek...@databricks.com>
Authored: Mon Aug 1 19:46:20 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Aug 1 19:46:20 2016 -0700
----------------------------------------------------------------------
.../datasources/FileSourceStrategy.scala | 2 ++
.../datasources/FileSourceStrategySuite.scala | 35 +++++++++++++++++++-
2 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/5fbf5f93/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 13a86bf..8af9562 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -202,7 +202,9 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
partitions
}
+ // These metadata values make scan plans uniquely identifiable for equality checking.
val meta = Map(
+ "PartitionFilters" -> partitionKeyFilters.mkString("[", ", ", "]"),
"Format" -> files.fileFormat.toString,
"ReadSchema" -> prunedDataSchema.simpleString,
PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"),
http://git-wip-us.apache.org/repos/asf/spark/blob/5fbf5f93/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 8d8a18f..7a24f21 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util
-import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -407,6 +407,39 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
+ test("[SPARK-16818] partition pruned file scans implement sameResult correctly") {
+ withTempPath { path =>
+ val tempDir = path.getCanonicalPath
+ spark.range(100)
+ .selectExpr("id", "id as b")
+ .write
+ .partitionBy("id")
+ .parquet(tempDir)
+ val df = spark.read.parquet(tempDir)
+ def getPlan(df: DataFrame): SparkPlan = {
+ df.queryExecution.executedPlan
+ }
+ assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2"))))
+ assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3"))))
+ }
+ }
+
+ test("[SPARK-16818] exchange reuse respects differences in partition pruning") {
+ spark.conf.set("spark.sql.exchange.reuse", true)
+ withTempPath { path =>
+ val tempDir = path.getCanonicalPath
+ spark.range(10)
+ .selectExpr("id % 2 as a", "id % 3 as b", "id as c")
+ .write
+ .partitionBy("a")
+ .parquet(tempDir)
+ val df = spark.read.parquet(tempDir)
+ val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum")
+ val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum")
+ checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil)
+ }
+ }
+
// Helpers for checking the arguments passed to the FileFormat.
protected val checkPartitionSchema =
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org