You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/09 08:34:25 UTC

[spark] branch master updated: [SPARK-35881][SQL][FOLLOWUP] Add a boolean flag in AdaptiveSparkPlanExec to ask for columnar output

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8714eef  [SPARK-35881][SQL][FOLLOWUP] Add a boolean flag in AdaptiveSparkPlanExec to ask for columnar output
8714eef is described below

commit 8714eefe6f975e6b106b59e2ab3af53df4555dce
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Aug 9 16:33:52 2021 +0800

    [SPARK-35881][SQL][FOLLOWUP] Add a boolean flag in AdaptiveSparkPlanExec to ask for columnar output
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up of https://github.com/apache/spark/pull/33140 to propose a simpler idea for integrating columnar execution into AQE.
    
    Instead of making the `ColumnarToRowExec` and `RowToColumnarExec` dynamic to handle `AdaptiveSparkPlanExec`, it's simpler to let the consumer decide if it needs columnar output or not, and pass a boolean flag to `AdaptiveSparkPlanExec`.
    
    For Spark vendors, they can set the flag differently in their custom columnar parquet writing command when the input plan is `AdaptiveSparkPlanExec`.
    
    One argument is if we need to look at the final plan of AQE and consume the data differently (either row or columnar format). I can't think of a use case and I think we can always statically know if the AQE plan should output row or columnar data.
    
    ### Why are the changes needed?
    
    code simplification.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    manual test
    
    Closes #33624 from cloud-fan/aqe.
    
    Lead-authored-by: Wenchen Fan <we...@databricks.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/execution/Columnar.scala  | 156 +++++++++------------
 .../spark/sql/execution/QueryExecution.scala       |   3 +-
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  52 +++----
 .../spark/sql/execution/ColumnarRulesSuite.scala   |   4 +-
 4 files changed, 93 insertions(+), 122 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 406e757..a70dba8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
 import org.apache.spark.sql.types._
@@ -66,9 +65,7 @@ trait ColumnarToRowTransition extends UnaryExecNode
  * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
  */
 case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport {
-  // child plan must be columnar or an adaptive plan, which could either be row-based or
-  // columnar, but we don't know until we execute it
-  assert(child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
+  assert(child.supportsColumnar)
 
   override def output: Seq[Attribute] = child.output
 
@@ -86,25 +83,18 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
   )
 
   override def doExecute(): RDD[InternalRow] = {
-    child match {
-      case a: AdaptiveSparkPlanExec if !a.finalPlanSupportsColumnar() =>
-        // if the child plan is adaptive and resulted in rows rather than columnar data
-        // then we can bypass any transition
-        a.execute()
-      case _ =>
-        val numOutputRows = longMetric("numOutputRows")
-        val numInputBatches = longMetric("numInputBatches")
-        // This avoids calling `output` in the RDD closure, so that we don't need to include
-        // the entire plan (this) in the closure.
-        val localOutput = this.output
-        child.executeColumnar().mapPartitionsInternal { batches =>
-          val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
-          batches.flatMap { batch =>
-            numInputBatches += 1
-            numOutputRows += batch.numRows()
-            batch.rowIterator().asScala.map(toUnsafe)
-          }
-        }
+    val numOutputRows = longMetric("numOutputRows")
+    val numInputBatches = longMetric("numInputBatches")
+    // This avoids calling `output` in the RDD closure, so that we don't need to include the entire
+    // plan (this) in the closure.
+    val localOutput = this.output
+    child.executeColumnar().mapPartitionsInternal { batches =>
+      val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
+      batches.flatMap { batch =>
+        numInputBatches += 1
+        numOutputRows += batch.numRows()
+        batch.rowIterator().asScala.map(toUnsafe)
+      }
     }
   }
 
@@ -429,10 +419,6 @@ trait RowToColumnarTransition extends UnaryExecNode
  * would only be to reduce code.
  */
 case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
-  // child plan must be row-based or an adaptive plan, which could either be row-based or
-  // columnar, but we don't know until we execute it
-  assert(!child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
-
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
@@ -455,60 +441,52 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
   )
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    child match {
-      case a: AdaptiveSparkPlanExec if a.finalPlanSupportsColumnar() =>
-        // if the child plan is adaptive and resulted in columnar data
-        // then we can bypass any transition
-        a.executeColumnar()
-      case _ =>
-        val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
-        val numInputRows = longMetric("numInputRows")
-        val numOutputBatches = longMetric("numOutputBatches")
-        // Instead of creating a new config we are reusing columnBatchSize. In the future if we do
-        // combine with some of the Arrow conversion tools we will need to unify some of the
-        // configs.
-        val numRows = conf.columnBatchSize
-        // This avoids calling `schema` in the RDD closure, so that we don't need to include the
-        // entire plan (this) in the closure.
-        val localSchema = this.schema
-        child.execute().mapPartitionsInternal { rowIterator =>
-          if (rowIterator.hasNext) {
-            new Iterator[ColumnarBatch] {
-              private val converters = new RowToColumnConverter(localSchema)
-              private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
-                OffHeapColumnVector.allocateColumns(numRows, localSchema)
-              } else {
-                OnHeapColumnVector.allocateColumns(numRows, localSchema)
-              }
-              private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
-
-              TaskContext.get().addTaskCompletionListener[Unit] { _ =>
-                cb.close()
-              }
-
-              override def hasNext: Boolean = {
-                rowIterator.hasNext
-              }
-
-              override def next(): ColumnarBatch = {
-                cb.setNumRows(0)
-                vectors.foreach(_.reset())
-                var rowCount = 0
-                while (rowCount < numRows && rowIterator.hasNext) {
-                  val row = rowIterator.next()
-                  converters.convert(row, vectors.toArray)
-                  rowCount += 1
-                }
-                cb.setNumRows(rowCount)
-                numInputRows += rowCount
-                numOutputBatches += 1
-                cb
-              }
-            }
+    val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
+    val numInputRows = longMetric("numInputRows")
+    val numOutputBatches = longMetric("numOutputBatches")
+    // Instead of creating a new config we are reusing columnBatchSize. In the future if we do
+    // combine with some of the Arrow conversion tools we will need to unify some of the configs.
+    val numRows = conf.columnBatchSize
+    // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
+    // plan (this) in the closure.
+    val localSchema = this.schema
+    child.execute().mapPartitionsInternal { rowIterator =>
+      if (rowIterator.hasNext) {
+        new Iterator[ColumnarBatch] {
+          private val converters = new RowToColumnConverter(localSchema)
+          private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
+            OffHeapColumnVector.allocateColumns(numRows, localSchema)
           } else {
-            Iterator.empty
+            OnHeapColumnVector.allocateColumns(numRows, localSchema)
+          }
+          private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
+
+          TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+            cb.close()
+          }
+
+          override def hasNext: Boolean = {
+            rowIterator.hasNext
+          }
+
+          override def next(): ColumnarBatch = {
+            cb.setNumRows(0)
+            vectors.foreach(_.reset())
+            var rowCount = 0
+            while (rowCount < numRows && rowIterator.hasNext) {
+              val row = rowIterator.next()
+              converters.convert(row, vectors.toArray)
+              rowCount += 1
+            }
+            cb.setNumRows(rowCount)
+            numInputRows += rowCount
+            numOutputBatches += 1
+            cb
           }
         }
+      } else {
+        Iterator.empty
+      }
     }
   }
 
@@ -519,9 +497,13 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
 /**
  * Apply any user defined [[ColumnarRule]]s and find the correct place to insert transitions
  * to/from columnar formatted data.
+ *
+ * @param columnarRules custom columnar rules
+ * @param outputsColumnar whether or not the produced plan should output columnar format.
  */
 case class ApplyColumnarRulesAndInsertTransitions(
-    columnarRules: Seq[ColumnarRule])
+    columnarRules: Seq[ColumnarRule],
+    outputsColumnar: Boolean)
   extends Rule[SparkPlan] {
 
   /**
@@ -531,7 +513,7 @@ case class ApplyColumnarRulesAndInsertTransitions(
     if (!plan.supportsColumnar) {
       // The tree feels kind of backwards
       // Columnar Processing will start here, so transition from row to columnar
-      RowToColumnarExec(insertTransitions(plan))
+      RowToColumnarExec(insertTransitions(plan, outputsColumnar = false))
     } else if (!plan.isInstanceOf[RowToColumnarTransition]) {
       plan.withNewChildren(plan.children.map(insertRowToColumnar))
     } else {
@@ -542,13 +524,15 @@ case class ApplyColumnarRulesAndInsertTransitions(
   /**
    * Inserts RowToColumnarExecs and ColumnarToRowExecs where needed.
    */
-  private def insertTransitions(plan: SparkPlan): SparkPlan = {
-    if (plan.supportsColumnar) {
-      // The tree feels kind of backwards
-      // This is the end of the columnar processing so go back to rows
+  private def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+    if (outputsColumnar) {
+      insertRowToColumnar(plan)
+    } else if (plan.supportsColumnar) {
+      // `outputsColumnar` is false but the plan outputs columnar format, so add a
+      // to-row transition here.
       ColumnarToRowExec(insertRowToColumnar(plan))
     } else if (!plan.isInstanceOf[ColumnarToRowTransition]) {
-      plan.withNewChildren(plan.children.map(insertTransitions))
+      plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar = false)))
     } else {
       plan
     }
@@ -558,7 +542,7 @@ case class ApplyColumnarRulesAndInsertTransitions(
     var preInsertPlan: SparkPlan = plan
     columnarRules.foreach((r : ColumnarRule) =>
       preInsertPlan = r.preColumnarTransitions(preInsertPlan))
-    var postInsertPlan = insertTransitions(preInsertPlan)
+    var postInsertPlan = insertTransitions(preInsertPlan, outputsColumnar)
     columnarRules.reverse.foreach((r : ColumnarRule) =>
       postInsertPlan = r.postColumnarTransitions(postInsertPlan))
     postInsertPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 6c16dce..361a910 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -414,7 +414,8 @@ object QueryExecution {
       // number of partitions when instantiating PartitioningCollection.
       RemoveRedundantSorts,
       DisableUnnecessaryBucketedScan,
-      ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules),
+      ApplyColumnarRulesAndInsertTransitions(
+        sparkSession.sessionState.columnarRules, outputsColumnar = false),
       CollapseCodegenStages()) ++
       (if (subquery) {
         Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 9db4574..2c242d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -65,7 +65,8 @@ case class AdaptiveSparkPlanExec(
     inputPlan: SparkPlan,
     @transient context: AdaptiveExecutionContext,
     @transient preprocessingRules: Seq[Rule[SparkPlan]],
-    @transient isSubquery: Boolean)
+    @transient isSubquery: Boolean,
+    @transient override val supportsColumnar: Boolean = false)
   extends LeafExecNode {
 
   @transient private val lock = new Object()
@@ -99,7 +100,7 @@ case class AdaptiveSparkPlanExec(
   // A list of physical plan rules to be applied before creation of query stages. The physical
   // plan should reach a final status of query stages (i.e., no more addition or removal of
   // Exchange nodes) after running these rules.
-  private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
+  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
     RemoveRedundantProjects,
     // For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for
     // the final plan, but we do need to respect the user-specified repartition. Here we ask
@@ -124,12 +125,14 @@ case class AdaptiveSparkPlanExec(
     OptimizeShuffleWithLocalRead
   )
 
+  @transient private val staticPostStageCreationRules: Seq[Rule[SparkPlan]] =
+    CollapseCodegenStages() +: context.session.sessionState.postStageCreationRules
+
   // A list of physical optimizer rules to be applied right after a new stage is created. The input
   // plan to these rules has exchange as its root node.
-  @transient private val postStageCreationRules = Seq(
-    ApplyColumnarRulesAndInsertTransitions(context.session.sessionState.columnarRules),
-    CollapseCodegenStages()
-  ) ++ context.session.sessionState.postStageCreationRules
+  private def postStageCreationRules(outputsColumnar: Boolean) =
+    ApplyColumnarRulesAndInsertTransitions(
+      context.session.sessionState.columnarRules, outputsColumnar) +: staticPostStageCreationRules
 
   private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean): SparkPlan = {
     val optimized = queryStageOptimizerRules.foldLeft(plan) { case (latestPlan, rule) =>
@@ -196,13 +199,6 @@ case class AdaptiveSparkPlanExec(
 
   override def doCanonicalize(): SparkPlan = inputPlan.canonicalized
 
-  // This operator reports that output is row-based but because of the adaptive nature of
-  // execution, we don't really know whether the output is going to row-based or columnar
-  // until we start running the query, so there is a finalPlanSupportsColumnar method that
-  // can be called at execution time to determine what the output format is.
-  // This operator can safely be wrapped in either RowToColumnarExec or ColumnarToRowExec.
-  override def supportsColumnar: Boolean = false
-
   override def resetMetrics(): Unit = {
     metrics.valuesIterator.foreach(_.reset())
     executedPlan.resetMetrics()
@@ -313,7 +309,7 @@ case class AdaptiveSparkPlanExec(
       // Run the final plan when there's no more unfinished stages.
       currentPhysicalPlan = applyPhysicalRules(
         optimizeQueryStage(result.newPlan, isFinalStage = true),
-        postStageCreationRules,
+        postStageCreationRules(supportsColumnar),
         Some((planChangeLogger, "AQE Post Stage Creation")))
       isFinalPlan = true
       executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
@@ -348,21 +344,17 @@ case class AdaptiveSparkPlanExec(
     withFinalPlanUpdate(_.execute())
   }
 
-  /**
-   * Determine if the final query stage supports columnar execution. Calling this method
-   * will trigger query execution of child query stages if they have not already executed.
-   *
-   * If this method returns true then it is safe to call doExecuteColumnar to execute the
-   * final stage.
-   */
-  def finalPlanSupportsColumnar(): Boolean = {
-    getFinalPhysicalPlan().supportsColumnar
-  }
-
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
     withFinalPlanUpdate(_.executeColumnar())
   }
 
+  override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
+    withFinalPlanUpdate { finalPlan =>
+      assert(finalPlan.isInstanceOf[BroadcastQueryStageExec])
+      finalPlan.doExecuteBroadcast()
+    }
+  }
+
   private def withFinalPlanUpdate[T](fun: SparkPlan => T): T = {
     val plan = getFinalPhysicalPlan()
     val result = fun(plan)
@@ -370,12 +362,6 @@ case class AdaptiveSparkPlanExec(
     result
   }
 
-  override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
-    val finalPlan = getFinalPhysicalPlan()
-    assert(finalPlan.isInstanceOf[BroadcastQueryStageExec])
-    finalPlan.doExecuteBroadcast()
-  }
-
   protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")
 
   override def generateTreeString(
@@ -536,7 +522,7 @@ case class AdaptiveSparkPlanExec(
       case s: ShuffleExchangeLike =>
         val newShuffle = applyPhysicalRules(
           s.withNewChildren(Seq(optimizedPlan)),
-          postStageCreationRules,
+          postStageCreationRules(outputsColumnar = false),
           Some((planChangeLogger, "AQE Post Stage Creation")))
         if (!newShuffle.isInstanceOf[ShuffleExchangeLike]) {
           throw new IllegalStateException(
@@ -546,7 +532,7 @@ case class AdaptiveSparkPlanExec(
       case b: BroadcastExchangeLike =>
         val newBroadcast = applyPhysicalRules(
           b.withNewChildren(Seq(optimizedPlan)),
-          postStageCreationRules,
+          postStageCreationRules(outputsColumnar = false),
           Some((planChangeLogger, "AQE Post Stage Creation")))
         if (!newBroadcast.isInstanceOf[BroadcastExchangeLike]) {
           throw new IllegalStateException(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
index df08acd..75223a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
@@ -27,7 +27,7 @@ class ColumnarRulesSuite extends PlanTest with SharedSparkSession {
 
   test("Idempotency of columnar rules - RowToColumnar/ColumnarToRow") {
     val rules = ApplyColumnarRulesAndInsertTransitions(
-      spark.sessionState.columnarRules)
+      spark.sessionState.columnarRules, false)
 
     val plan = UnaryOp(UnaryOp(LeafOp(false), true), false)
     val expected =
@@ -40,7 +40,7 @@ class ColumnarRulesSuite extends PlanTest with SharedSparkSession {
 
   test("Idempotency of columnar rules - ColumnarToRow/RowToColumnar") {
     val rules = ApplyColumnarRulesAndInsertTransitions(
-      spark.sessionState.columnarRules)
+      spark.sessionState.columnarRules, false)
 
     val plan = UnaryOp(UnaryOp(LeafOp(true), false), true)
     val expected = ColumnarToRowExec(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org