You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by maropu <gi...@git.apache.org> on 2018/08/02 03:07:45 UTC
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request:
https://github.com/apache/spark/pull/21754#discussion_r207090221
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan
// Ignore this wrapper for canonicalizing.
override def doCanonicalize(): SparkPlan = child.canonicalized
+ override protected def doPrepare(): Unit = {
+ child match {
+ case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) =>
+ coordinator.registerExchange(shuffleExchange)
--- End diff --
`EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` before `ReuseExchange`;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L85
For example, in the test in this pr, it sets `3` in `ExchangeCoordinator`;
https://github.com/apache/spark/pull/21754/files#diff-3cd46a3f60c5352282bd3f2c9efff7fcR505
`ReuseExchange` reuses some exchange and the actual number of registered exchanges changes, e.g., in the test in this pr, the number changes from `3` to `2`.
Then, the assertion below in `ExchangeCoordinator` fails because the logical number of exchanges and the actual number of registered exchanges;
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201
The objective of this fix is to respect the number of reused exchanges in `ExchangeCoordinator`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org