You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/02/12 01:51:31 UTC
[spark] branch branch-3.0 updated: [SPARK-30780][SQL] Empty
LocalTableScan should use RDD without partitions
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5199d2f [SPARK-30780][SQL] Empty LocalTableScan should use RDD without partitions
5199d2f is described below
commit 5199d2f9dcf044f759318457ce3c0a56e00d9537
Author: herman <he...@databricks.com>
AuthorDate: Wed Feb 12 10:48:29 2020 +0900
[SPARK-30780][SQL] Empty LocalTableScan should use RDD without partitions
### What changes were proposed in this pull request?
This is a small follow-up for https://github.com/apache/spark/pull/27400. This PR makes an empty `LocalTableScanExec` return an `RDD` without partitions.
### Why are the changes needed?
It is a bit unexpected that the RDD contains partitions if there is not work to do. It also can save a bit of work when this is used in a more complex plan.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added test to `SparkPlanSuite`.
Closes #27530 from hvanhovell/SPARK-30780.
Authored-by: herman <he...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit b25359cca3190f6a34dce3c3e49c4d2a80e88bdc)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../org/apache/spark/sql/execution/LocalTableScanExec.scala | 12 ++++++++----
.../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala | 2 +-
.../org/apache/spark/sql/execution/SparkPlanSuite.scala | 4 ++++
3 files changed, 13 insertions(+), 5 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 1b5115f..b452213 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -45,10 +45,14 @@ case class LocalTableScanExec(
}
}
- private lazy val numParallelism: Int = math.min(math.max(unsafeRows.length, 1),
- sqlContext.sparkContext.defaultParallelism)
-
- private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism)
+ @transient private lazy val rdd: RDD[InternalRow] = {
+ if (rows.isEmpty) {
+ sqlContext.sparkContext.emptyRDD
+ } else {
+ val numSlices = math.min(unsafeRows.length, sqlContext.sparkContext.defaultParallelism)
+ sqlContext.sparkContext.parallelize(unsafeRows, numSlices)
+ }
+ }
protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index d2d58a8..694e576 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -330,7 +330,7 @@ class DataFrameSuite extends QueryTest
testData.select("key").coalesce(1).select("key"),
testData.select("key").collect().toSeq)
- assert(spark.emptyDataFrame.coalesce(1).rdd.partitions.size === 1)
+ assert(spark.emptyDataFrame.coalesce(1).rdd.partitions.size === 0)
}
test("convert $\"attribute name\" into unresolved attribute") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index e3bc414..56fff11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -84,4 +84,8 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
}
}
}
+
+ test("SPARK-30780 empty LocalTableScan should use RDD without partitions") {
+ assert(LocalTableScanExec(Nil, Nil).execute().getNumPartitions == 0)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org