You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2017/11/15 13:37:21 UTC
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
GitHub user viirya opened a pull request:
https://github.com/apache/spark/pull/19756
[SPARK-22527][SQL] Reuse coordinated exchanges if possible
## What changes were proposed in this pull request?
ShuffleExchange without coordinator can be reused, but coordinated ShuffleExchange can't currently.
For an example query:
```scala
// Assume SQLConf.ADAPTIVE_EXECUTION_ENABLED is enabled.
val query =
"""SELECT name, sum(amount) FROM table1 GROUP BY name
|UNION ALL SELECT name, sum(amount) FROM table1 GROUP BY name
|UNION ALL SELECT name, sum(amount) FROM table1 GROUP BY name
|""".stripMargin
sql(query)
```
The executed plan looks like:
```
Union
:- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#21L])
: +- Exchange(coordinator id: 1962297942) hashpartitioning(name#17, 5), coordinator[target post-shuffle partition
size: 67108864]
: +- *HashAggregate(keys=[name#17], functions=[partial_sum(cast(amount#16 as bigint))], output=[name#17, sum#28
L])
: +- *FileScan parquet default.table1[amount#16,name#17] Batched: true, Format: Parquet, Location: InMemoryF
ileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSc
hema: struct<amount:int,name:string>
:- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#22L])
: +- Exchange(coordinator id: 1500445863) hashpartitioning(name#17, 5), coordinator[target post-shuffle partition
size: 67108864]
: +- *HashAggregate(keys=[name#17], functions=[partial_sum(cast(amount#16 as bigint))], output=[name#17, sum#30
L])
: +- *FileScan parquet default.table1[amount#16,name#17] Batched: true, Format: Parquet, Location: InMemoryF
ileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSc
hema: struct<amount:int,name:string>
+- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#23L])
+- Exchange(coordinator id: 1275650533) hashpartitioning(name#17, 5), coordinator[target post-shuffle partition
size: 67108864]
+- *HashAggregate(keys=[name#17], functions=[partial_sum(cast(amount#16 as bigint))], output=[name#17, sum#32
L])
+- *FileScan parquet default.table1[amount#16,name#17] Batched: true, Format: Parquet, Location: InMemoryF
ileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSc
hema: struct<amount:int,name:string>
```
We should be able to reuse coordinated ShuffleExchange, like:
```
Union
:- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#21L])
: +- Exchange(coordinator id: 260303313) hashpartitioning(name#17, 5), coordinator[target post-shuffle partition s
ize: 67108864]
: +- *HashAggregate(keys=[name#17], functions=[partial_sum(cast(amount#16 as bigint))], output=[name#17, sum#28
L])
: +- *FileScan parquet default.table1[amount#16,name#17] Batched: true, Format: Parquet, Location: InMemoryF
ileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSc
hema: struct<amount:int,name:string>
:- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#22L])
: +- ReusedExchange [name#17, sum#30L], Exchange(coordinator id: 260303313) hashpartitioning(name#17, 5), coordina
tor[target post-shuffle partition size: 67108864]
+- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#23L])
+- ReusedExchange [name#17, sum#32L], Exchange(coordinator id: 260303313) hashpartitioning(name#17, 5), coordina
tor[target post-shuffle partition size: 67108864]
```
## How was this patch tested?
Added test.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/viirya/spark-1 SPARK-22527
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/19756.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #19756
----
commit 069699af84e2ffc133a7cfa56bc682e0f62f93a8
Author: Liang-Chi Hsieh <vi...@gmail.com>
Date: 2017-11-15T10:16:05Z
Reuse coordinated exchanges if possible.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19756
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83894/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/19756
The exchanges only register with `ExchangeCoordinator` after query transformation. So during transformation, we don't know if two `ExchangeCoordinator` have the same exchanges.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19756#discussion_r151281258
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
@@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
}
}
}
+
+/**
+ * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
+ * the references.
+ */
+case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
+
+ // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
+ private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
+ plan.children.nonEmpty && plan.children.forall {
+ case ShuffleExchangeExec(_, _, Some(_)) => true
+ case _ => false
+ }
+ }
+
+ // Returns true if two sequences of exchanges are producing the same results.
+ private def hasExchangesWithSameResults(
+ source: Seq[ShuffleExchangeExec],
+ target: Seq[ShuffleExchangeExec]): Boolean = {
+ source.length == target.length &&
+ source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
--- End diff --
Actually I think this can be relaxed to:
```scala
val sourceWithoutCoordinator = source.map(_.withoutCoordinator)
val targetWithoutCoordinator = target.map(_.withoutCoordinator)
source.length == target.length && sourceWithoutCoordinator.forall(s => targetWithoutCoordinator.exists(_.sameResult(s)))
```
For simplicity and safety, I leave it as this.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/19756#discussion_r152992616
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -108,6 +108,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
+ ReuseExchangeWithCoordinator(sparkSession.sessionState.conf),
--- End diff --
here we can put a `ReuseExchange` again.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
Posted by cenyuhai <gi...@git.apache.org>.
Github user cenyuhai commented on a diff in the pull request:
https://github.com/apache/spark/pull/19756#discussion_r151611386
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
@@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
}
}
}
+
+/**
+ * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
+ * the references.
+ */
+case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
+
+ // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
+ private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
+ plan.children.nonEmpty && plan.children.forall {
+ case ShuffleExchangeExec(_, _, Some(_)) => true
+ case _ => false
+ }
+ }
+
+ // Returns true if two sequences of exchanges are producing the same results.
+ private def hasExchangesWithSameResults(
+ source: Seq[ShuffleExchangeExec],
+ target: Seq[ShuffleExchangeExec]): Boolean = {
+ source.length == target.length &&
+ source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
+ }
+
+ type CoordinatorSignature = (Int, Long, Option[Int])
+
+ def apply(plan: SparkPlan): SparkPlan = {
+ if (!conf.exchangeReuseEnabled) {
--- End diff --
exchangeReuseEnabled still has a bug: SPARK-20295, can we use a new configuration?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/19756
can you also explain the cause of this bug instead of just describing the bug? By looking at your code, seems we can just fix the `Exchange.sameResult` to exclude the coordinator?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19756
**[Test build #83894 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83894/testReport)** for PR 19756 at commit [`069699a`](https://github.com/apache/spark/commit/069699af84e2ffc133a7cfa56bc682e0f62f93a8).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:
* `case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] `
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/19756
then can't we correctly implement `equals` for the coordinator?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
Posted by cenyuhai <gi...@git.apache.org>.
Github user cenyuhai commented on a diff in the pull request:
https://github.com/apache/spark/pull/19756#discussion_r151615572
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
@@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
}
}
}
+
+/**
+ * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
+ * the references.
+ */
+case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
+
+ // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
+ private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
+ plan.children.nonEmpty && plan.children.forall {
+ case ShuffleExchangeExec(_, _, Some(_)) => true
+ case _ => false
+ }
+ }
+
+ // Returns true if two sequences of exchanges are producing the same results.
+ private def hasExchangesWithSameResults(
+ source: Seq[ShuffleExchangeExec],
+ target: Seq[ShuffleExchangeExec]): Boolean = {
+ source.length == target.length &&
+ source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
+ }
+
+ type CoordinatorSignature = (Int, Long, Option[Int])
+
+ def apply(plan: SparkPlan): SparkPlan = {
+ if (!conf.exchangeReuseEnabled) {
--- End diff --
ok
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
Posted by cenyuhai <gi...@git.apache.org>.
Github user cenyuhai commented on a diff in the pull request:
https://github.com/apache/spark/pull/19756#discussion_r151612678
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
@@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
}
}
}
+
+/**
+ * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
+ * the references.
+ */
+case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
+
+ // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
+ private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
+ plan.children.nonEmpty && plan.children.forall {
+ case ShuffleExchangeExec(_, _, Some(_)) => true
+ case _ => false
+ }
+ }
+
+ // Returns true if two sequences of exchanges are producing the same results.
+ private def hasExchangesWithSameResults(
+ source: Seq[ShuffleExchangeExec],
+ target: Seq[ShuffleExchangeExec]): Boolean = {
+ source.length == target.length &&
+ source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
+ }
+
+ type CoordinatorSignature = (Int, Long, Option[Int])
+
+ def apply(plan: SparkPlan): SparkPlan = {
+ if (!conf.exchangeReuseEnabled) {
--- End diff --
I don't know whether spark 2.2 still has this bug or not. I am using spark 2.1
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/19756
ping @cloud-fan @hvanhovell
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19756#discussion_r153065340
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -108,6 +108,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
EnsureRequirements(sparkSession.sessionState.conf),
CollapseCodegenStages(sparkSession.sessionState.conf),
ReuseExchange(sparkSession.sessionState.conf),
+ ReuseExchangeWithCoordinator(sparkSession.sessionState.conf),
--- End diff --
At this point, exchanges are not registered yet. Registration of exchanges happens after all query transformation.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19756#discussion_r151614576
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
@@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
}
}
}
+
+/**
+ * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
+ * the references.
+ */
+case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
+
+ // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
+ private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
+ plan.children.nonEmpty && plan.children.forall {
+ case ShuffleExchangeExec(_, _, Some(_)) => true
+ case _ => false
+ }
+ }
+
+ // Returns true if two sequences of exchanges are producing the same results.
+ private def hasExchangesWithSameResults(
+ source: Seq[ShuffleExchangeExec],
+ target: Seq[ShuffleExchangeExec]): Boolean = {
+ source.length == target.length &&
+ source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
+ }
+
+ type CoordinatorSignature = (Int, Long, Option[Int])
+
+ def apply(plan: SparkPlan): SparkPlan = {
+ if (!conf.exchangeReuseEnabled) {
--- End diff --
The posted partial query plan looks suspect too. You can check my reply there.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/19756
My first thought is also to fix `Exchange.sameResult`. But I realize soon that it is not a right fix. The reason is, the results of coordinated exchanges are dependent with all exchanges coordinated by the same coordinator. Even `Exchange.sameResult` returns true for two exchanges by excluding their coordinator, if they have different exchange siblings, the coordinated shuffle results are different.
For example, consider two groups of coordinated exchanges:
```
RootNode
-Node 1
-- Exchange A (coordinator 1)
-- Exchange B (coordinator 1)
...
-Node 2
-- Exchange C (coordinator 2)
-- Exchange D (coordinator 2)
-- Exchange E (coordinator 2)
Says exchange A and D has same result if we don't consider coordinator. In this case we can't replace exchange D with exchange A, because exchange D's shuffle partitions might be different than exchange A due to coordination.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/19756
ping @cloud-fan
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/19756
ping @cloud-fan Please take a look. Thanks.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/19756
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
Posted by viirya <gi...@git.apache.org>.
Github user viirya closed the pull request at:
https://github.com/apache/spark/pull/19756
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/19756
can we run `ReuseExchange` twice?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/19756
@cloud-fan Do you have some other comments on this? Or shall we close this?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/19756#discussion_r151612155
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
@@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
}
}
}
+
+/**
+ * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
+ * the references.
+ */
+case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
+
+ // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
+ private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
+ plan.children.nonEmpty && plan.children.forall {
+ case ShuffleExchangeExec(_, _, Some(_)) => true
+ case _ => false
+ }
+ }
+
+ // Returns true if two sequences of exchanges are producing the same results.
+ private def hasExchangesWithSameResults(
+ source: Seq[ShuffleExchangeExec],
+ target: Seq[ShuffleExchangeExec]): Boolean = {
+ source.length == target.length &&
+ source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
+ }
+
+ type CoordinatorSignature = (Int, Long, Option[Int])
+
+ def apply(plan: SparkPlan): SparkPlan = {
+ if (!conf.exchangeReuseEnabled) {
--- End diff --
I replied on that JIRA ticket. I suspect if the reported issue is still in 2.2.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/19756
cc @cloud-fan for review.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/19756
**[Test build #83894 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83894/testReport)** for PR 19756 at commit [`069699a`](https://github.com/apache/spark/commit/069699af84e2ffc133a7cfa56bc682e0f62f93a8).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/19756
ping @cloud-fan
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/19756
Do you mean we manually run `ReuseExchange` after the registration of exchanges?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org