You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2021/07/30 20:39:31 UTC

[spark] branch branch-3.2 updated: [SPARK-35881][SQL] Add support for columnar execution of final query stage in AdaptiveSparkPlanExec

This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f9f5656  [SPARK-35881][SQL] Add support for columnar execution of final query stage in AdaptiveSparkPlanExec
f9f5656 is described below

commit f9f5656491c1edbfbc9f8c13840aaa935c49037f
Author: Andy Grove <an...@gmail.com>
AuthorDate: Fri Jul 30 13:21:50 2021 -0500

    [SPARK-35881][SQL] Add support for columnar execution of final query stage in AdaptiveSparkPlanExec
    
    ### What changes were proposed in this pull request?
    
    Changes in this PR:
    
    - `AdaptiveSparkPlanExec` has new methods `finalPlanSupportsColumnar` and `doExecuteColumnar` to support adaptive queries where the final query stage produces columnar data.
    - `SessionState` now has a new set of injectable rules named `finalQueryStagePrepRules` that can be applied to the final query stage.
    - `AdaptiveSparkPlanExec` can now safely be wrapped by either `RowToColumnarExec` or `ColumnarToRowExec`.
    
    A Spark plugin can use the new rules to remove the root `ColumnarToRowExec` transition that is inserted by previous rules and at execution time can call `finalPlanSupportsColumnar` to see if the final query stage is columnar. If the plan is columnar then the plugin can safely call `doExecuteColumnar`. The adaptive plan can be wrapped in either `RowToColumnarExec` or `ColumnarToRowExec` to force a particular output format. There are fast paths in both of these operators to avoid any re [...]
    
    ### Why are the changes needed?
    
    Without this change it is necessary to use reflection to get the final physical plan to determine whether it is columnar and to execute it is a columnar plan. `AdaptiveSparkPlanExec` only provides public methods for row-based execution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I have manually tested this patch with the RAPIDS Accelerator for Apache Spark.
    
    Closes #33140 from andygrove/support-columnar-adaptive.
    
    Authored-by: Andy Grove <an...@gmail.com>
    Signed-off-by: Thomas Graves <tg...@apache.org>
    (cherry picked from commit 0f538402fb76e4d6182cc881219d53b5fdf73af1)
    Signed-off-by: Thomas Graves <tg...@apache.org>
---
 .../apache/spark/sql/SparkSessionExtensions.scala  |  22 +++-
 .../org/apache/spark/sql/execution/Columnar.scala  | 134 ++++++++++++---------
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  46 +++++--
 .../sql/internal/BaseSessionStateBuilder.scala     |   7 +-
 .../apache/spark/sql/internal/SessionState.scala   |   3 +-
 5 files changed, 141 insertions(+), 71 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
index b14dce6..18ebae5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, SparkPlan}
  * <li>(External) Catalog listeners.</li>
  * <li>Columnar Rules.</li>
  * <li>Adaptive Query Stage Preparation Rules.</li>
+ * <li>Adaptive Query Post Stage Preparation Rules.</li>
  * </ul>
  *
  * The extensions can be used by calling `withExtensions` on the [[SparkSession.Builder]], for
@@ -110,9 +111,12 @@ class SparkSessionExtensions {
   type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder)
   type ColumnarRuleBuilder = SparkSession => ColumnarRule
   type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
+  type PostStageCreationRuleBuilder = SparkSession => Rule[SparkPlan]
 
   private[this] val columnarRuleBuilders = mutable.Buffer.empty[ColumnarRuleBuilder]
   private[this] val queryStagePrepRuleBuilders = mutable.Buffer.empty[QueryStagePrepRuleBuilder]
+  private[this] val postStageCreationRuleBuilders =
+    mutable.Buffer.empty[PostStageCreationRuleBuilder]
 
   /**
    * Build the override rules for columnar execution.
@@ -129,6 +133,14 @@ class SparkSessionExtensions {
   }
 
   /**
+   * Build the override rules for the final query stage preparation phase of adaptive query
+   * execution.
+   */
+  private[sql] def buildPostStageCreationRules(session: SparkSession): Seq[Rule[SparkPlan]] = {
+    postStageCreationRuleBuilders.map(_.apply(session)).toSeq
+  }
+
+  /**
    * Inject a rule that can override the columnar execution of an executor.
    */
   def injectColumnar(builder: ColumnarRuleBuilder): Unit = {
@@ -136,13 +148,21 @@ class SparkSessionExtensions {
   }
 
   /**
-   * Inject a rule that can override the the query stage preparation phase of adaptive query
+   * Inject a rule that can override the query stage preparation phase of adaptive query
    * execution.
    */
   def injectQueryStagePrepRule(builder: QueryStagePrepRuleBuilder): Unit = {
     queryStagePrepRuleBuilders += builder
   }
 
+  /**
+   * Inject a rule that can override the final query stage preparation phase of adaptive query
+   * execution.
+   */
+  def injectPostStageCreationRule(builder: PostStageCreationRuleBuilder): Unit = {
+    postStageCreationRuleBuilders += builder
+  }
+
   private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 2be8233..406e757 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
 import org.apache.spark.sql.types._
@@ -65,7 +66,9 @@ trait ColumnarToRowTransition extends UnaryExecNode
  * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
  */
 case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport {
-  assert(child.supportsColumnar)
+  // child plan must be columnar or an adaptive plan, which could either be row-based or
+  // columnar, but we don't know until we execute it
+  assert(child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
 
   override def output: Seq[Attribute] = child.output
 
@@ -83,18 +86,25 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
   )
 
   override def doExecute(): RDD[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
-    val numInputBatches = longMetric("numInputBatches")
-    // This avoids calling `output` in the RDD closure, so that we don't need to include the entire
-    // plan (this) in the closure.
-    val localOutput = this.output
-    child.executeColumnar().mapPartitionsInternal { batches =>
-      val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
-      batches.flatMap { batch =>
-        numInputBatches += 1
-        numOutputRows += batch.numRows()
-        batch.rowIterator().asScala.map(toUnsafe)
-      }
+    child match {
+      case a: AdaptiveSparkPlanExec if !a.finalPlanSupportsColumnar() =>
+        // if the child plan is adaptive and resulted in rows rather than columnar data
+        // then we can bypass any transition
+        a.execute()
+      case _ =>
+        val numOutputRows = longMetric("numOutputRows")
+        val numInputBatches = longMetric("numInputBatches")
+        // This avoids calling `output` in the RDD closure, so that we don't need to include
+        // the entire plan (this) in the closure.
+        val localOutput = this.output
+        child.executeColumnar().mapPartitionsInternal { batches =>
+          val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
+          batches.flatMap { batch =>
+            numInputBatches += 1
+            numOutputRows += batch.numRows()
+            batch.rowIterator().asScala.map(toUnsafe)
+          }
+        }
     }
   }
 
@@ -419,6 +429,10 @@ trait RowToColumnarTransition extends UnaryExecNode
  * would only be to reduce code.
  */
 case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
+  // child plan must be row-based or an adaptive plan, which could either be row-based or
+  // columnar, but we don't know until we execute it
+  assert(!child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
+
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
@@ -441,52 +455,60 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
   )
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
-    val numInputRows = longMetric("numInputRows")
-    val numOutputBatches = longMetric("numOutputBatches")
-    // Instead of creating a new config we are reusing columnBatchSize. In the future if we do
-    // combine with some of the Arrow conversion tools we will need to unify some of the configs.
-    val numRows = conf.columnBatchSize
-    // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
-    // plan (this) in the closure.
-    val localSchema = this.schema
-    child.execute().mapPartitionsInternal { rowIterator =>
-      if (rowIterator.hasNext) {
-        new Iterator[ColumnarBatch] {
-          private val converters = new RowToColumnConverter(localSchema)
-          private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
-            OffHeapColumnVector.allocateColumns(numRows, localSchema)
-          } else {
-            OnHeapColumnVector.allocateColumns(numRows, localSchema)
-          }
-          private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
-
-          TaskContext.get().addTaskCompletionListener[Unit] { _ =>
-            cb.close()
-          }
-
-          override def hasNext: Boolean = {
-            rowIterator.hasNext
-          }
-
-          override def next(): ColumnarBatch = {
-            cb.setNumRows(0)
-            vectors.foreach(_.reset())
-            var rowCount = 0
-            while (rowCount < numRows && rowIterator.hasNext) {
-              val row = rowIterator.next()
-              converters.convert(row, vectors.toArray)
-              rowCount += 1
+    child match {
+      case a: AdaptiveSparkPlanExec if a.finalPlanSupportsColumnar() =>
+        // if the child plan is adaptive and resulted in columnar data
+        // then we can bypass any transition
+        a.executeColumnar()
+      case _ =>
+        val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
+        val numInputRows = longMetric("numInputRows")
+        val numOutputBatches = longMetric("numOutputBatches")
+        // Instead of creating a new config we are reusing columnBatchSize. In the future if we do
+        // combine with some of the Arrow conversion tools we will need to unify some of the
+        // configs.
+        val numRows = conf.columnBatchSize
+        // This avoids calling `schema` in the RDD closure, so that we don't need to include the
+        // entire plan (this) in the closure.
+        val localSchema = this.schema
+        child.execute().mapPartitionsInternal { rowIterator =>
+          if (rowIterator.hasNext) {
+            new Iterator[ColumnarBatch] {
+              private val converters = new RowToColumnConverter(localSchema)
+              private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
+                OffHeapColumnVector.allocateColumns(numRows, localSchema)
+              } else {
+                OnHeapColumnVector.allocateColumns(numRows, localSchema)
+              }
+              private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
+
+              TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+                cb.close()
+              }
+
+              override def hasNext: Boolean = {
+                rowIterator.hasNext
+              }
+
+              override def next(): ColumnarBatch = {
+                cb.setNumRows(0)
+                vectors.foreach(_.reset())
+                var rowCount = 0
+                while (rowCount < numRows && rowIterator.hasNext) {
+                  val row = rowIterator.next()
+                  converters.convert(row, vectors.toArray)
+                  rowCount += 1
+                }
+                cb.setNumRows(rowCount)
+                numInputRows += rowCount
+                numOutputBatches += 1
+                cb
+              }
             }
-            cb.setNumRows(rowCount)
-            numInputRows += rowCount
-            numOutputBatches += 1
-            cb
+          } else {
+            Iterator.empty
           }
         }
-      } else {
-        Iterator.empty
-      }
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 2f6619d..c03bb4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan
 import org.apache.spark.sql.execution.exchange._
 import org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -111,7 +112,7 @@ case class AdaptiveSparkPlanExec(
   @transient private val postStageCreationRules = Seq(
     ApplyColumnarRulesAndInsertTransitions(context.session.sessionState.columnarRules),
     CollapseCodegenStages()
-  )
+  ) ++ context.session.sessionState.postStageCreationRules
 
   // The partitioning of the query output depends on the shuffle(s) in the final stage. If the
   // original plan contains a repartition operator, we need to preserve the specified partitioning,
@@ -187,6 +188,13 @@ case class AdaptiveSparkPlanExec(
 
   override def doCanonicalize(): SparkPlan = inputPlan.canonicalized
 
+  // This operator reports that output is row-based but because of the adaptive nature of
+  // execution, we don't really know whether the output is going to row-based or columnar
+  // until we start running the query, so there is a finalPlanSupportsColumnar method that
+  // can be called at execution time to determine what the output format is.
+  // This operator can safely be wrapped in either RowToColumnarExec or ColumnarToRowExec.
+  override def supportsColumnar: Boolean = false
+
   override def resetMetrics(): Unit = {
     metrics.valuesIterator.foreach(_.reset())
     executedPlan.resetMetrics()
@@ -314,27 +322,41 @@ case class AdaptiveSparkPlanExec(
   }
 
   override def executeCollect(): Array[InternalRow] = {
-    val rdd = getFinalPhysicalPlan().executeCollect()
-    finalPlanUpdate
-    rdd
+    withFinalPlanUpdate(_.executeCollect())
   }
 
   override def executeTake(n: Int): Array[InternalRow] = {
-    val rdd = getFinalPhysicalPlan().executeTake(n)
-    finalPlanUpdate
-    rdd
+    withFinalPlanUpdate(_.executeTake(n))
   }
 
   override def executeTail(n: Int): Array[InternalRow] = {
-    val rdd = getFinalPhysicalPlan().executeTail(n)
-    finalPlanUpdate
-    rdd
+    withFinalPlanUpdate(_.executeTail(n))
   }
 
   override def doExecute(): RDD[InternalRow] = {
-    val rdd = getFinalPhysicalPlan().execute()
+    withFinalPlanUpdate(_.execute())
+  }
+
+  /**
+   * Determine if the final query stage supports columnar execution. Calling this method
+   * will trigger query execution of child query stages if they have not already executed.
+   *
+   * If this method returns true then it is safe to call doExecuteColumnar to execute the
+   * final stage.
+   */
+  def finalPlanSupportsColumnar(): Boolean = {
+    getFinalPhysicalPlan().supportsColumnar
+  }
+
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    withFinalPlanUpdate(_.executeColumnar())
+  }
+
+  private def withFinalPlanUpdate[T](fun: SparkPlan => T): T = {
+    val plan = getFinalPhysicalPlan()
+    val result = fun(plan)
     finalPlanUpdate
-    rdd
+    result
   }
 
   override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 8289819..1c0c916 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -307,6 +307,10 @@ abstract class BaseSessionStateBuilder(
     extensions.buildQueryStagePrepRules(session)
   }
 
+  protected def postStageCreationRules: Seq[Rule[SparkPlan]] = {
+    extensions.buildPostStageCreationRules(session)
+  }
+
   /**
    * Create a query execution object.
    */
@@ -360,7 +364,8 @@ abstract class BaseSessionStateBuilder(
       createQueryExecution,
       createClone,
       columnarRules,
-      queryStagePrepRules)
+      queryStagePrepRules,
+      postStageCreationRules)
   }
 }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index cdf764a..7685e54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -79,7 +79,8 @@ private[sql] class SessionState(
     createQueryExecution: (LogicalPlan, CommandExecutionMode.Value) => QueryExecution,
     createClone: (SparkSession, SessionState) => SessionState,
     val columnarRules: Seq[ColumnarRule],
-    val queryStagePrepRules: Seq[Rule[SparkPlan]]) {
+    val queryStagePrepRules: Seq[Rule[SparkPlan]],
+    val postStageCreationRules: Seq[Rule[SparkPlan]]) {
 
   // The following fields are lazy to avoid creating the Hive client when creating SessionState.
   lazy val catalog: SessionCatalog = catalogBuilder()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org