You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/09 08:34:25 UTC
[spark] branch master updated: [SPARK-35881][SQL][FOLLOWUP] Add a
boolean flag in AdaptiveSparkPlanExec to ask for columnar output
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8714eef [SPARK-35881][SQL][FOLLOWUP] Add a boolean flag in AdaptiveSparkPlanExec to ask for columnar output
8714eef is described below
commit 8714eefe6f975e6b106b59e2ab3af53df4555dce
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Aug 9 16:33:52 2021 +0800
[SPARK-35881][SQL][FOLLOWUP] Add a boolean flag in AdaptiveSparkPlanExec to ask for columnar output
### What changes were proposed in this pull request?
This is a follow-up of https://github.com/apache/spark/pull/33140 to propose a simpler idea for integrating columnar execution into AQE.
Instead of making the `ColumnarToRowExec` and `RowToColumnarExec` dynamic to handle `AdaptiveSparkPlanExec`, it's simpler to let the consumer decide if it needs columnar output or not, and pass a boolean flag to `AdaptiveSparkPlanExec`.
For Spark vendors, they can set the flag differently in their custom columnar parquet writing command when the input plan is `AdaptiveSparkPlanExec`.
One argument is if we need to look at the final plan of AQE and consume the data differently (either row or columnar format). I can't think of a use case and I think we can always statically know if the AQE plan should output row or columnar data.
### Why are the changes needed?
code simplification.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
manual test
Closes #33624 from cloud-fan/aqe.
Lead-authored-by: Wenchen Fan <we...@databricks.com>
Co-authored-by: Wenchen Fan <cl...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/execution/Columnar.scala | 156 +++++++++------------
.../spark/sql/execution/QueryExecution.scala | 3 +-
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 52 +++----
.../spark/sql/execution/ColumnarRulesSuite.scala | 4 +-
4 files changed, 93 insertions(+), 122 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 406e757..a70dba8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types._
@@ -66,9 +65,7 @@ trait ColumnarToRowTransition extends UnaryExecNode
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
*/
case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport {
- // child plan must be columnar or an adaptive plan, which could either be row-based or
- // columnar, but we don't know until we execute it
- assert(child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
+ assert(child.supportsColumnar)
override def output: Seq[Attribute] = child.output
@@ -86,25 +83,18 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
)
override def doExecute(): RDD[InternalRow] = {
- child match {
- case a: AdaptiveSparkPlanExec if !a.finalPlanSupportsColumnar() =>
- // if the child plan is adaptive and resulted in rows rather than columnar data
- // then we can bypass any transition
- a.execute()
- case _ =>
- val numOutputRows = longMetric("numOutputRows")
- val numInputBatches = longMetric("numInputBatches")
- // This avoids calling `output` in the RDD closure, so that we don't need to include
- // the entire plan (this) in the closure.
- val localOutput = this.output
- child.executeColumnar().mapPartitionsInternal { batches =>
- val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
- batches.flatMap { batch =>
- numInputBatches += 1
- numOutputRows += batch.numRows()
- batch.rowIterator().asScala.map(toUnsafe)
- }
- }
+ val numOutputRows = longMetric("numOutputRows")
+ val numInputBatches = longMetric("numInputBatches")
+ // This avoids calling `output` in the RDD closure, so that we don't need to include the entire
+ // plan (this) in the closure.
+ val localOutput = this.output
+ child.executeColumnar().mapPartitionsInternal { batches =>
+ val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
+ batches.flatMap { batch =>
+ numInputBatches += 1
+ numOutputRows += batch.numRows()
+ batch.rowIterator().asScala.map(toUnsafe)
+ }
}
}
@@ -429,10 +419,6 @@ trait RowToColumnarTransition extends UnaryExecNode
* would only be to reduce code.
*/
case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
- // child plan must be row-based or an adaptive plan, which could either be row-based or
- // columnar, but we don't know until we execute it
- assert(!child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
-
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
@@ -455,60 +441,52 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
)
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
- child match {
- case a: AdaptiveSparkPlanExec if a.finalPlanSupportsColumnar() =>
- // if the child plan is adaptive and resulted in columnar data
- // then we can bypass any transition
- a.executeColumnar()
- case _ =>
- val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
- val numInputRows = longMetric("numInputRows")
- val numOutputBatches = longMetric("numOutputBatches")
- // Instead of creating a new config we are reusing columnBatchSize. In the future if we do
- // combine with some of the Arrow conversion tools we will need to unify some of the
- // configs.
- val numRows = conf.columnBatchSize
- // This avoids calling `schema` in the RDD closure, so that we don't need to include the
- // entire plan (this) in the closure.
- val localSchema = this.schema
- child.execute().mapPartitionsInternal { rowIterator =>
- if (rowIterator.hasNext) {
- new Iterator[ColumnarBatch] {
- private val converters = new RowToColumnConverter(localSchema)
- private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
- OffHeapColumnVector.allocateColumns(numRows, localSchema)
- } else {
- OnHeapColumnVector.allocateColumns(numRows, localSchema)
- }
- private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
-
- TaskContext.get().addTaskCompletionListener[Unit] { _ =>
- cb.close()
- }
-
- override def hasNext: Boolean = {
- rowIterator.hasNext
- }
-
- override def next(): ColumnarBatch = {
- cb.setNumRows(0)
- vectors.foreach(_.reset())
- var rowCount = 0
- while (rowCount < numRows && rowIterator.hasNext) {
- val row = rowIterator.next()
- converters.convert(row, vectors.toArray)
- rowCount += 1
- }
- cb.setNumRows(rowCount)
- numInputRows += rowCount
- numOutputBatches += 1
- cb
- }
- }
+ val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
+ val numInputRows = longMetric("numInputRows")
+ val numOutputBatches = longMetric("numOutputBatches")
+ // Instead of creating a new config we are reusing columnBatchSize. In the future if we do
+ // combine with some of the Arrow conversion tools we will need to unify some of the configs.
+ val numRows = conf.columnBatchSize
+ // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
+ // plan (this) in the closure.
+ val localSchema = this.schema
+ child.execute().mapPartitionsInternal { rowIterator =>
+ if (rowIterator.hasNext) {
+ new Iterator[ColumnarBatch] {
+ private val converters = new RowToColumnConverter(localSchema)
+ private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
+ OffHeapColumnVector.allocateColumns(numRows, localSchema)
} else {
- Iterator.empty
+ OnHeapColumnVector.allocateColumns(numRows, localSchema)
+ }
+ private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
+
+ TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+ cb.close()
+ }
+
+ override def hasNext: Boolean = {
+ rowIterator.hasNext
+ }
+
+ override def next(): ColumnarBatch = {
+ cb.setNumRows(0)
+ vectors.foreach(_.reset())
+ var rowCount = 0
+ while (rowCount < numRows && rowIterator.hasNext) {
+ val row = rowIterator.next()
+ converters.convert(row, vectors.toArray)
+ rowCount += 1
+ }
+ cb.setNumRows(rowCount)
+ numInputRows += rowCount
+ numOutputBatches += 1
+ cb
}
}
+ } else {
+ Iterator.empty
+ }
}
}
@@ -519,9 +497,13 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
/**
* Apply any user defined [[ColumnarRule]]s and find the correct place to insert transitions
* to/from columnar formatted data.
+ *
+ * @param columnarRules custom columnar rules
+ * @param outputsColumnar whether or not the produced plan should output columnar format.
*/
case class ApplyColumnarRulesAndInsertTransitions(
- columnarRules: Seq[ColumnarRule])
+ columnarRules: Seq[ColumnarRule],
+ outputsColumnar: Boolean)
extends Rule[SparkPlan] {
/**
@@ -531,7 +513,7 @@ case class ApplyColumnarRulesAndInsertTransitions(
if (!plan.supportsColumnar) {
// The tree feels kind of backwards
// Columnar Processing will start here, so transition from row to columnar
- RowToColumnarExec(insertTransitions(plan))
+ RowToColumnarExec(insertTransitions(plan, outputsColumnar = false))
} else if (!plan.isInstanceOf[RowToColumnarTransition]) {
plan.withNewChildren(plan.children.map(insertRowToColumnar))
} else {
@@ -542,13 +524,15 @@ case class ApplyColumnarRulesAndInsertTransitions(
/**
* Inserts RowToColumnarExecs and ColumnarToRowExecs where needed.
*/
- private def insertTransitions(plan: SparkPlan): SparkPlan = {
- if (plan.supportsColumnar) {
- // The tree feels kind of backwards
- // This is the end of the columnar processing so go back to rows
+ private def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+ if (outputsColumnar) {
+ insertRowToColumnar(plan)
+ } else if (plan.supportsColumnar) {
+ // `outputsColumnar` is false but the plan outputs columnar format, so add a
+ // to-row transition here.
ColumnarToRowExec(insertRowToColumnar(plan))
} else if (!plan.isInstanceOf[ColumnarToRowTransition]) {
- plan.withNewChildren(plan.children.map(insertTransitions))
+ plan.withNewChildren(plan.children.map(insertTransitions(_, outputsColumnar = false)))
} else {
plan
}
@@ -558,7 +542,7 @@ case class ApplyColumnarRulesAndInsertTransitions(
var preInsertPlan: SparkPlan = plan
columnarRules.foreach((r : ColumnarRule) =>
preInsertPlan = r.preColumnarTransitions(preInsertPlan))
- var postInsertPlan = insertTransitions(preInsertPlan)
+ var postInsertPlan = insertTransitions(preInsertPlan, outputsColumnar)
columnarRules.reverse.foreach((r : ColumnarRule) =>
postInsertPlan = r.postColumnarTransitions(postInsertPlan))
postInsertPlan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 6c16dce..361a910 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -414,7 +414,8 @@ object QueryExecution {
// number of partitions when instantiating PartitioningCollection.
RemoveRedundantSorts,
DisableUnnecessaryBucketedScan,
- ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.columnarRules),
+ ApplyColumnarRulesAndInsertTransitions(
+ sparkSession.sessionState.columnarRules, outputsColumnar = false),
CollapseCodegenStages()) ++
(if (subquery) {
Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 9db4574..2c242d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -65,7 +65,8 @@ case class AdaptiveSparkPlanExec(
inputPlan: SparkPlan,
@transient context: AdaptiveExecutionContext,
@transient preprocessingRules: Seq[Rule[SparkPlan]],
- @transient isSubquery: Boolean)
+ @transient isSubquery: Boolean,
+ @transient override val supportsColumnar: Boolean = false)
extends LeafExecNode {
@transient private val lock = new Object()
@@ -99,7 +100,7 @@ case class AdaptiveSparkPlanExec(
// A list of physical plan rules to be applied before creation of query stages. The physical
// plan should reach a final status of query stages (i.e., no more addition or removal of
// Exchange nodes) after running these rules.
- private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
+ @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
RemoveRedundantProjects,
// For cases like `df.repartition(a, b).select(c)`, there is no distribution requirement for
// the final plan, but we do need to respect the user-specified repartition. Here we ask
@@ -124,12 +125,14 @@ case class AdaptiveSparkPlanExec(
OptimizeShuffleWithLocalRead
)
+ @transient private val staticPostStageCreationRules: Seq[Rule[SparkPlan]] =
+ CollapseCodegenStages() +: context.session.sessionState.postStageCreationRules
+
// A list of physical optimizer rules to be applied right after a new stage is created. The input
// plan to these rules has exchange as its root node.
- @transient private val postStageCreationRules = Seq(
- ApplyColumnarRulesAndInsertTransitions(context.session.sessionState.columnarRules),
- CollapseCodegenStages()
- ) ++ context.session.sessionState.postStageCreationRules
+ private def postStageCreationRules(outputsColumnar: Boolean) =
+ ApplyColumnarRulesAndInsertTransitions(
+ context.session.sessionState.columnarRules, outputsColumnar) +: staticPostStageCreationRules
private def optimizeQueryStage(plan: SparkPlan, isFinalStage: Boolean): SparkPlan = {
val optimized = queryStageOptimizerRules.foldLeft(plan) { case (latestPlan, rule) =>
@@ -196,13 +199,6 @@ case class AdaptiveSparkPlanExec(
override def doCanonicalize(): SparkPlan = inputPlan.canonicalized
- // This operator reports that output is row-based but because of the adaptive nature of
- // execution, we don't really know whether the output is going to row-based or columnar
- // until we start running the query, so there is a finalPlanSupportsColumnar method that
- // can be called at execution time to determine what the output format is.
- // This operator can safely be wrapped in either RowToColumnarExec or ColumnarToRowExec.
- override def supportsColumnar: Boolean = false
-
override def resetMetrics(): Unit = {
metrics.valuesIterator.foreach(_.reset())
executedPlan.resetMetrics()
@@ -313,7 +309,7 @@ case class AdaptiveSparkPlanExec(
// Run the final plan when there's no more unfinished stages.
currentPhysicalPlan = applyPhysicalRules(
optimizeQueryStage(result.newPlan, isFinalStage = true),
- postStageCreationRules,
+ postStageCreationRules(supportsColumnar),
Some((planChangeLogger, "AQE Post Stage Creation")))
isFinalPlan = true
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
@@ -348,21 +344,17 @@ case class AdaptiveSparkPlanExec(
withFinalPlanUpdate(_.execute())
}
- /**
- * Determine if the final query stage supports columnar execution. Calling this method
- * will trigger query execution of child query stages if they have not already executed.
- *
- * If this method returns true then it is safe to call doExecuteColumnar to execute the
- * final stage.
- */
- def finalPlanSupportsColumnar(): Boolean = {
- getFinalPhysicalPlan().supportsColumnar
- }
-
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
withFinalPlanUpdate(_.executeColumnar())
}
+ override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
+ withFinalPlanUpdate { finalPlan =>
+ assert(finalPlan.isInstanceOf[BroadcastQueryStageExec])
+ finalPlan.doExecuteBroadcast()
+ }
+ }
+
private def withFinalPlanUpdate[T](fun: SparkPlan => T): T = {
val plan = getFinalPhysicalPlan()
val result = fun(plan)
@@ -370,12 +362,6 @@ case class AdaptiveSparkPlanExec(
result
}
- override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
- val finalPlan = getFinalPhysicalPlan()
- assert(finalPlan.isInstanceOf[BroadcastQueryStageExec])
- finalPlan.doExecuteBroadcast()
- }
-
protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")
override def generateTreeString(
@@ -536,7 +522,7 @@ case class AdaptiveSparkPlanExec(
case s: ShuffleExchangeLike =>
val newShuffle = applyPhysicalRules(
s.withNewChildren(Seq(optimizedPlan)),
- postStageCreationRules,
+ postStageCreationRules(outputsColumnar = false),
Some((planChangeLogger, "AQE Post Stage Creation")))
if (!newShuffle.isInstanceOf[ShuffleExchangeLike]) {
throw new IllegalStateException(
@@ -546,7 +532,7 @@ case class AdaptiveSparkPlanExec(
case b: BroadcastExchangeLike =>
val newBroadcast = applyPhysicalRules(
b.withNewChildren(Seq(optimizedPlan)),
- postStageCreationRules,
+ postStageCreationRules(outputsColumnar = false),
Some((planChangeLogger, "AQE Post Stage Creation")))
if (!newBroadcast.isInstanceOf[BroadcastExchangeLike]) {
throw new IllegalStateException(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
index df08acd..75223a7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala
@@ -27,7 +27,7 @@ class ColumnarRulesSuite extends PlanTest with SharedSparkSession {
test("Idempotency of columnar rules - RowToColumnar/ColumnarToRow") {
val rules = ApplyColumnarRulesAndInsertTransitions(
- spark.sessionState.columnarRules)
+ spark.sessionState.columnarRules, false)
val plan = UnaryOp(UnaryOp(LeafOp(false), true), false)
val expected =
@@ -40,7 +40,7 @@ class ColumnarRulesSuite extends PlanTest with SharedSparkSession {
test("Idempotency of columnar rules - ColumnarToRow/RowToColumnar") {
val rules = ApplyColumnarRulesAndInsertTransitions(
- spark.sessionState.columnarRules)
+ spark.sessionState.columnarRules, false)
val plan = UnaryOp(UnaryOp(LeafOp(true), false), true)
val expected = ColumnarToRowExec(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org