You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2021/11/19 03:16:28 UTC

[spark] branch master updated: [SPARK-37371][SQL] UnionExec should support columnar if all children support columnar

This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0398b5b  [SPARK-37371][SQL] UnionExec should support columnar if all children support columnar
0398b5b is described below

commit 0398b5b5ebc48cf46d18c090443c5e54b16d5b35
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Thu Nov 18 19:15:24 2021 -0800

    [SPARK-37371][SQL] UnionExec should support columnar if all children support columnar
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to make `UnionExec` support columnar if all its children support that.
    
    ### Why are the changes needed?
    
    For `UnionExec`, if all its children support columnar, the union physical plan should be able to support columnar too as it is just union of all partitions from the children. By doing this, we can avoid unnecessary transition between row and columnar batch.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #34645 from viirya/SPARK-37371.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../sql/execution/basicPhysicalOperators.scala     |  7 ++++
 .../spark/sql/DataFrameSetOperationsSuite.scala    | 37 ++++++++++++++++++++++
 2 files changed, 44 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 8e0080a..763abbe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.types.{LongType, StructType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.ThreadUtils
 import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
 
@@ -697,6 +698,12 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan {
   protected override def doExecute(): RDD[InternalRow] =
     sparkContext.union(children.map(_.execute()))
 
+  override def supportsColumnar: Boolean = children.forall(_.supportsColumnar)
+
+  protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    sparkContext.union(children.map(_.executeColumnar()))
+  }
+
   override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): UnionExec =
     copy(children = newChildren)
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
index 650d878..26df517 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSetOperationsSuite.scala
@@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp}
 
 import org.apache.spark.sql.catalyst.optimizer.RemoveNoopUnion
 import org.apache.spark.sql.catalyst.plans.logical.Union
+import org.apache.spark.sql.execution.{SparkPlan, UnionExec}
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSparkSession, SQLTestData}
@@ -1355,6 +1357,41 @@ class DataFrameSetOperationsSuite extends QueryTest with SharedSparkSession {
       Row(Row(Seq(Seq(Row("bb", null))))) ::
       Row(Row(Seq(Seq(Row(null, "ba"))))) :: Nil)
   }
+
+  test("SPARK-37371: UnionExec should support columnar if all children support columnar") {
+    def checkIfColumnar(
+        plan: SparkPlan,
+        targetPlan: (SparkPlan) => Boolean,
+        isColumnar: Boolean): Unit = {
+      val target = plan.collect {
+        case p if targetPlan(p) => p
+      }
+      assert(target.nonEmpty)
+      assert(target.forall(_.supportsColumnar == isColumnar))
+    }
+
+    Seq(true, false).foreach { supported =>
+      withSQLConf(SQLConf.CACHE_VECTORIZED_READER_ENABLED.key -> supported.toString) {
+        val df1 = Seq(1, 2, 3).toDF("i").cache()
+        val df2 = Seq(4, 5, 6).toDF("j").cache()
+
+        checkIfColumnar(df1.queryExecution.executedPlan,
+          _.isInstanceOf[InMemoryTableScanExec], supported)
+        checkIfColumnar(df2.queryExecution.executedPlan,
+          _.isInstanceOf[InMemoryTableScanExec], supported)
+
+        val union = df1.union(df2)
+        checkIfColumnar(union.queryExecution.executedPlan, _.isInstanceOf[UnionExec], supported)
+        checkAnswer(union, Row(1) :: Row(2) :: Row(3) :: Row(4) :: Row(5) :: Row(6) :: Nil)
+
+        val nonColumnarUnion = df1.union(Seq(7, 8, 9).toDF("k"))
+        checkIfColumnar(nonColumnarUnion.queryExecution.executedPlan,
+          _.isInstanceOf[UnionExec], false)
+        checkAnswer(nonColumnarUnion,
+          Row(1) :: Row(2) :: Row(3) :: Row(7) :: Row(8) :: Row(9) :: Nil)
+      }
+    }
+  }
 }
 
 case class UnionClass1a(a: Int, b: Long, nested: UnionClass2)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org