You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/02/12 01:51:31 UTC

[spark] branch branch-3.0 updated: [SPARK-30780][SQL] Empty LocalTableScan should use RDD without partitions

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 5199d2f  [SPARK-30780][SQL] Empty LocalTableScan should use RDD without partitions
5199d2f is described below

commit 5199d2f9dcf044f759318457ce3c0a56e00d9537
Author: herman <he...@databricks.com>
AuthorDate: Wed Feb 12 10:48:29 2020 +0900

    [SPARK-30780][SQL] Empty LocalTableScan should use RDD without partitions
    
    ### What changes were proposed in this pull request?
    This is a small follow-up for https://github.com/apache/spark/pull/27400. This PR makes an empty `LocalTableScanExec` return an `RDD` without partitions.
    
    ### Why are the changes needed?
    It is a bit unexpected that the RDD contains partitions if there is not work to do. It also can save a bit of work when this is used in a more complex plan.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Added test to `SparkPlanSuite`.
    
    Closes #27530 from hvanhovell/SPARK-30780.
    
    Authored-by: herman <he...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit b25359cca3190f6a34dce3c3e49c4d2a80e88bdc)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../org/apache/spark/sql/execution/LocalTableScanExec.scala  | 12 ++++++++----
 .../src/test/scala/org/apache/spark/sql/DataFrameSuite.scala |  2 +-
 .../org/apache/spark/sql/execution/SparkPlanSuite.scala      |  4 ++++
 3 files changed, 13 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
index 1b5115f..b452213 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala
@@ -45,10 +45,14 @@ case class LocalTableScanExec(
     }
   }
 
-  private lazy val numParallelism: Int = math.min(math.max(unsafeRows.length, 1),
-    sqlContext.sparkContext.defaultParallelism)
-
-  private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism)
+  @transient private lazy val rdd: RDD[InternalRow] = {
+    if (rows.isEmpty) {
+      sqlContext.sparkContext.emptyRDD
+    } else {
+      val numSlices = math.min(unsafeRows.length, sqlContext.sparkContext.defaultParallelism)
+      sqlContext.sparkContext.parallelize(unsafeRows, numSlices)
+    }
+  }
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index d2d58a8..694e576 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -330,7 +330,7 @@ class DataFrameSuite extends QueryTest
       testData.select("key").coalesce(1).select("key"),
       testData.select("key").collect().toSeq)
 
-    assert(spark.emptyDataFrame.coalesce(1).rdd.partitions.size === 1)
+    assert(spark.emptyDataFrame.coalesce(1).rdd.partitions.size === 0)
   }
 
   test("convert $\"attribute name\" into unresolved attribute") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index e3bc414..56fff11 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -84,4 +84,8 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
       }
     }
   }
+
+  test("SPARK-30780 empty LocalTableScan should use RDD without partitions") {
+    assert(LocalTableScanExec(Nil, Nil).execute().getNumPartitions == 0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org