You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2021/11/10 03:12:32 UTC

[spark] branch master updated: [SPARK-37221][SQL][FOLLOWUP] Add toRowBased to SparkPlan

This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e1f3f22  [SPARK-37221][SQL][FOLLOWUP] Add toRowBased to SparkPlan
e1f3f22 is described below

commit e1f3f22c3dabfea27880e02cbb5df6533c875795
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Nov 9 19:11:44 2021 -0800

    [SPARK-37221][SQL][FOLLOWUP] Add toRowBased to SparkPlan
    
    ### What changes were proposed in this pull request?
    
    This is a follow up of #34499. Instead of adding `ColumnarToRowExec` in `getByteArrayRdd`, this patch adds `toRowBased` API to explicitly ask for columnar-to-row-based conversion.
    
    ### Why are the changes needed?
    
    To make the conversion selectable.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #34538 from viirya/columnar-followup.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../scala/org/apache/spark/sql/execution/SparkPlan.scala     | 12 ++++++------
 .../org/apache/spark/sql/execution/SparkPlanSuite.scala      |  4 ++--
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index ea3b133..5c4266d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -313,6 +313,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
   }
 
   /**
+   * Converts the output of this plan to row-based if it is columnar plan.
+   */
+  def toRowBased: SparkPlan = if (supportsColumnar) ColumnarToRowExec(this) else this
+
+  /**
    * Packing the UnsafeRows into byte array for faster serialization.
    * The byte arrays are in the following format:
    * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
@@ -322,12 +327,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
    */
   private def getByteArrayRdd(
       n: Int = -1, takeFromEnd: Boolean = false): RDD[(Long, Array[Byte])] = {
-    val rdd = if (supportsColumnar) {
-      ColumnarToRowExec(this).execute()
-    } else {
-      execute()
-    }
-    rdd.mapPartitionsInternal { iter =>
+    execute().mapPartitionsInternal { iter =>
       var count = 0
       val buffer = new Array[Byte](4 << 10)  // 4K
       val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index c9bbee2..bc4dfcb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -116,12 +116,12 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
   }
 
   test("SPARK-37221: The collect-like API in SparkPlan should support columnar output") {
-    val emptyResults = ColumnarOp(LocalTableScanExec(Nil, Nil)).executeCollect()
+    val emptyResults = ColumnarOp(LocalTableScanExec(Nil, Nil)).toRowBased.executeCollect()
     assert(emptyResults.isEmpty)
 
     val relation = LocalTableScanExec(
       Seq(AttributeReference("val", IntegerType)()), Seq(InternalRow(1)))
-    val nonEmpty = ColumnarOp(relation).executeCollect()
+    val nonEmpty = ColumnarOp(relation).toRowBased.executeCollect()
     assert(nonEmpty === relation.executeCollect())
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org