You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2021/11/10 03:12:32 UTC
[spark] branch master updated: [SPARK-37221][SQL][FOLLOWUP] Add
toRowBased to SparkPlan
This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e1f3f22 [SPARK-37221][SQL][FOLLOWUP] Add toRowBased to SparkPlan
e1f3f22 is described below
commit e1f3f22c3dabfea27880e02cbb5df6533c875795
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Nov 9 19:11:44 2021 -0800
[SPARK-37221][SQL][FOLLOWUP] Add toRowBased to SparkPlan
### What changes were proposed in this pull request?
This is a follow up of #34499. Instead of adding `ColumnarToRowExec` in `getByteArrayRdd`, this patch adds `toRowBased` API to explicitly ask for columnar-to-row-based conversion.
### Why are the changes needed?
To make the conversion selectable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests.
Closes #34538 from viirya/columnar-followup.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
.../scala/org/apache/spark/sql/execution/SparkPlan.scala | 12 ++++++------
.../org/apache/spark/sql/execution/SparkPlanSuite.scala | 4 ++--
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index ea3b133..5c4266d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -313,6 +313,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
/**
+ * Converts the output of this plan to row-based if it is columnar plan.
+ */
+ def toRowBased: SparkPlan = if (supportsColumnar) ColumnarToRowExec(this) else this
+
+ /**
* Packing the UnsafeRows into byte array for faster serialization.
* The byte arrays are in the following format:
* [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
@@ -322,12 +327,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
private def getByteArrayRdd(
n: Int = -1, takeFromEnd: Boolean = false): RDD[(Long, Array[Byte])] = {
- val rdd = if (supportsColumnar) {
- ColumnarToRowExec(this).execute()
- } else {
- execute()
- }
- rdd.mapPartitionsInternal { iter =>
+ execute().mapPartitionsInternal { iter =>
var count = 0
val buffer = new Array[Byte](4 << 10) // 4K
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index c9bbee2..bc4dfcb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
@@ -116,12 +116,12 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
}
test("SPARK-37221: The collect-like API in SparkPlan should support columnar output") {
- val emptyResults = ColumnarOp(LocalTableScanExec(Nil, Nil)).executeCollect()
+ val emptyResults = ColumnarOp(LocalTableScanExec(Nil, Nil)).toRowBased.executeCollect()
assert(emptyResults.isEmpty)
val relation = LocalTableScanExec(
Seq(AttributeReference("val", IntegerType)()), Seq(InternalRow(1)))
- val nonEmpty = ColumnarOp(relation).executeCollect()
+ val nonEmpty = ColumnarOp(relation).toRowBased.executeCollect()
assert(nonEmpty === relation.executeCollect())
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org