You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2017/11/15 13:37:21 UTC

[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/19756

    [SPARK-22527][SQL] Reuse coordinated exchanges if possible

    ## What changes were proposed in this pull request?
    
    ShuffleExchange without coordinator can be reused, but coordinated ShuffleExchange can't currently.
    
    For an example query:
    
    ```scala
    // Assume SQLConf.ADAPTIVE_EXECUTION_ENABLED is enabled.
    val query =
      """SELECT name, sum(amount) FROM table1 GROUP BY name
        |UNION ALL SELECT name, sum(amount) FROM table1 GROUP BY name
        |UNION ALL SELECT name, sum(amount) FROM table1 GROUP BY name
        |""".stripMargin
    sql(query)
    ```
    
    The executed plan looks like:
    
    ```
    Union
    :- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#21L])
    :  +- Exchange(coordinator id: 1962297942) hashpartitioning(name#17, 5), coordinator[target post-shuffle partition 
    size: 67108864]
    :     +- *HashAggregate(keys=[name#17], functions=[partial_sum(cast(amount#16 as bigint))], output=[name#17, sum#28
    L])
    :        +- *FileScan parquet default.table1[amount#16,name#17] Batched: true, Format: Parquet, Location: InMemoryF
    ileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSc
    hema: struct<amount:int,name:string>
    :- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#22L])
    :  +- Exchange(coordinator id: 1500445863) hashpartitioning(name#17, 5), coordinator[target post-shuffle partition 
    size: 67108864]
    :     +- *HashAggregate(keys=[name#17], functions=[partial_sum(cast(amount#16 as bigint))], output=[name#17, sum#30
    L])
    :        +- *FileScan parquet default.table1[amount#16,name#17] Batched: true, Format: Parquet, Location: InMemoryF
    ileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSc
    hema: struct<amount:int,name:string>
    +- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#23L])
       +- Exchange(coordinator id: 1275650533) hashpartitioning(name#17, 5), coordinator[target post-shuffle partition 
    size: 67108864]
          +- *HashAggregate(keys=[name#17], functions=[partial_sum(cast(amount#16 as bigint))], output=[name#17, sum#32
    L])
             +- *FileScan parquet default.table1[amount#16,name#17] Batched: true, Format: Parquet, Location: InMemoryF
    ileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSc
    hema: struct<amount:int,name:string>
    ```
    
    We should be able to reuse coordinated ShuffleExchange, like:
    
    ```
    Union
    :- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#21L])
    :  +- Exchange(coordinator id: 260303313) hashpartitioning(name#17, 5), coordinator[target post-shuffle partition s
    ize: 67108864]
    :     +- *HashAggregate(keys=[name#17], functions=[partial_sum(cast(amount#16 as bigint))], output=[name#17, sum#28
    L])
    :        +- *FileScan parquet default.table1[amount#16,name#17] Batched: true, Format: Parquet, Location: InMemoryF
    ileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [], ReadSc
    hema: struct<amount:int,name:string>
    :- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#22L])
    :  +- ReusedExchange [name#17, sum#30L], Exchange(coordinator id: 260303313) hashpartitioning(name#17, 5), coordina
    tor[target post-shuffle partition size: 67108864]
    +- *HashAggregate(keys=[name#17], functions=[sum(cast(amount#16 as bigint))], output=[name#17, sum(amount)#23L])
       +- ReusedExchange [name#17, sum#32L], Exchange(coordinator id: 260303313) hashpartitioning(name#17, 5), coordina
    tor[target post-shuffle partition size: 67108864]
    ```
    
    
    
    
    ## How was this patch tested?
    
    Added test.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 SPARK-22527

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19756.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19756
    
----
commit 069699af84e2ffc133a7cfa56bc682e0f62f93a8
Author: Liang-Chi Hsieh <vi...@gmail.com>
Date:   2017-11-15T10:16:05Z

    Reuse coordinated exchanges if possible.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83894/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    The exchanges only register with `ExchangeCoordinator` after query transformation. So during transformation, we don't know if two `ExchangeCoordinator` have the same exchanges.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19756#discussion_r151281258
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
    @@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
         }
       }
     }
    +
    +/**
    + * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
    + * the references.
    + */
    +case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
    +
    +  // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
    +  private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
    +    plan.children.nonEmpty && plan.children.forall {
    +      case ShuffleExchangeExec(_, _, Some(_)) => true
    +      case _ => false
    +    }
    +  }
    +
    +  // Returns true if two sequences of exchanges are producing the same results.
    +  private def hasExchangesWithSameResults(
    +      source: Seq[ShuffleExchangeExec],
    +      target: Seq[ShuffleExchangeExec]): Boolean = {
    +    source.length == target.length &&
    +      source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
    --- End diff --
    
    Actually I think this can be relaxed to:
    
    ```scala
    val sourceWithoutCoordinator = source.map(_.withoutCoordinator)
    val targetWithoutCoordinator = target.map(_.withoutCoordinator)
    source.length == target.length && sourceWithoutCoordinator.forall(s => targetWithoutCoordinator.exists(_.sameResult(s)))
    ```
    
    For simplicity and safety, I leave it as this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19756#discussion_r152992616
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
    @@ -108,6 +108,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
         EnsureRequirements(sparkSession.sessionState.conf),
         CollapseCodegenStages(sparkSession.sessionState.conf),
         ReuseExchange(sparkSession.sessionState.conf),
    +    ReuseExchangeWithCoordinator(sparkSession.sessionState.conf),
    --- End diff --
    
    here we can put a `ReuseExchange` again.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

Posted by cenyuhai <gi...@git.apache.org>.
Github user cenyuhai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19756#discussion_r151611386
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
    @@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
         }
       }
     }
    +
    +/**
    + * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
    + * the references.
    + */
    +case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
    +
    +  // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
    +  private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
    +    plan.children.nonEmpty && plan.children.forall {
    +      case ShuffleExchangeExec(_, _, Some(_)) => true
    +      case _ => false
    +    }
    +  }
    +
    +  // Returns true if two sequences of exchanges are producing the same results.
    +  private def hasExchangesWithSameResults(
    +      source: Seq[ShuffleExchangeExec],
    +      target: Seq[ShuffleExchangeExec]): Boolean = {
    +    source.length == target.length &&
    +      source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
    +  }
    +
    +  type CoordinatorSignature = (Int, Long, Option[Int])
    +
    +  def apply(plan: SparkPlan): SparkPlan = {
    +    if (!conf.exchangeReuseEnabled) {
    --- End diff --
    
    exchangeReuseEnabled still has a bug: SPARK-20295, can we use a new configuration?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    can you also explain the cause of this bug instead of just describing the bug? By looking at your code, seems we can just fix the `Exchange.sameResult` to exclude the coordinator?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    **[Test build #83894 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83894/testReport)** for PR 19756 at commit [`069699a`](https://github.com/apache/spark/commit/069699af84e2ffc133a7cfa56bc682e0f62f93a8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] `


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    then can't we correctly implement `equals` for the coordinator?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

Posted by cenyuhai <gi...@git.apache.org>.
Github user cenyuhai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19756#discussion_r151615572
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
    @@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
         }
       }
     }
    +
    +/**
    + * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
    + * the references.
    + */
    +case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
    +
    +  // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
    +  private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
    +    plan.children.nonEmpty && plan.children.forall {
    +      case ShuffleExchangeExec(_, _, Some(_)) => true
    +      case _ => false
    +    }
    +  }
    +
    +  // Returns true if two sequences of exchanges are producing the same results.
    +  private def hasExchangesWithSameResults(
    +      source: Seq[ShuffleExchangeExec],
    +      target: Seq[ShuffleExchangeExec]): Boolean = {
    +    source.length == target.length &&
    +      source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
    +  }
    +
    +  type CoordinatorSignature = (Int, Long, Option[Int])
    +
    +  def apply(plan: SparkPlan): SparkPlan = {
    +    if (!conf.exchangeReuseEnabled) {
    --- End diff --
    
    ok


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

Posted by cenyuhai <gi...@git.apache.org>.
Github user cenyuhai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19756#discussion_r151612678
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
    @@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
         }
       }
     }
    +
    +/**
    + * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
    + * the references.
    + */
    +case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
    +
    +  // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
    +  private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
    +    plan.children.nonEmpty && plan.children.forall {
    +      case ShuffleExchangeExec(_, _, Some(_)) => true
    +      case _ => false
    +    }
    +  }
    +
    +  // Returns true if two sequences of exchanges are producing the same results.
    +  private def hasExchangesWithSameResults(
    +      source: Seq[ShuffleExchangeExec],
    +      target: Seq[ShuffleExchangeExec]): Boolean = {
    +    source.length == target.length &&
    +      source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
    +  }
    +
    +  type CoordinatorSignature = (Int, Long, Option[Int])
    +
    +  def apply(plan: SparkPlan): SparkPlan = {
    +    if (!conf.exchangeReuseEnabled) {
    --- End diff --
    
    I don't know whether spark 2.2 still has this bug or not. I am using spark 2.1


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    ping @cloud-fan @hvanhovell 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19756#discussion_r153065340
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
    @@ -108,6 +108,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
         EnsureRequirements(sparkSession.sessionState.conf),
         CollapseCodegenStages(sparkSession.sessionState.conf),
         ReuseExchange(sparkSession.sessionState.conf),
    +    ReuseExchangeWithCoordinator(sparkSession.sessionState.conf),
    --- End diff --
    
    At this point, exchanges are not registered yet. Registration of exchanges happens after all query transformation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19756#discussion_r151614576
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
    @@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
         }
       }
     }
    +
    +/**
    + * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
    + * the references.
    + */
    +case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
    +
    +  // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
    +  private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
    +    plan.children.nonEmpty && plan.children.forall {
    +      case ShuffleExchangeExec(_, _, Some(_)) => true
    +      case _ => false
    +    }
    +  }
    +
    +  // Returns true if two sequences of exchanges are producing the same results.
    +  private def hasExchangesWithSameResults(
    +      source: Seq[ShuffleExchangeExec],
    +      target: Seq[ShuffleExchangeExec]): Boolean = {
    +    source.length == target.length &&
    +      source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
    +  }
    +
    +  type CoordinatorSignature = (Int, Long, Option[Int])
    +
    +  def apply(plan: SparkPlan): SparkPlan = {
    +    if (!conf.exchangeReuseEnabled) {
    --- End diff --
    
    The posted partial query plan looks suspect too. You can check my reply there.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    My first thought is also to fix `Exchange.sameResult`. But I realize soon that it is not a right fix. The reason is, the results of coordinated exchanges are dependent with all exchanges coordinated by the same coordinator. Even `Exchange.sameResult` returns true for two exchanges by excluding their coordinator, if they have different exchange siblings, the coordinated shuffle results are different.
    
    For example, consider two groups of coordinated exchanges:
    
    ```
    RootNode
    -Node 1
    -- Exchange A (coordinator 1)
    -- Exchange B (coordinator 1)
    ...
    -Node 2
    -- Exchange C (coordinator 2)
    -- Exchange D (coordinator 2)
    -- Exchange E (coordinator 2)
    
    Says exchange A and D has same result if we don't consider coordinator. In this case we can't replace exchange D with exchange A, because exchange D's shuffle partitions might be different than exchange A due to coordination.
    
    
    
    
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    ping @cloud-fan 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    ping @cloud-fan Please take a look. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

Posted by viirya <gi...@git.apache.org>.
Github user viirya closed the pull request at:

    https://github.com/apache/spark/pull/19756


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    can we run `ReuseExchange` twice?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    @cloud-fan Do you have some other comments on this? Or shall we close this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19756#discussion_r151612155
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ---
    @@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
         }
       }
     }
    +
    +/**
    + * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all
    + * the references.
    + */
    +case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] {
    +
    +  // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children.
    +  private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = {
    +    plan.children.nonEmpty && plan.children.forall {
    +      case ShuffleExchangeExec(_, _, Some(_)) => true
    +      case _ => false
    +    }
    +  }
    +
    +  // Returns true if two sequences of exchanges are producing the same results.
    +  private def hasExchangesWithSameResults(
    +      source: Seq[ShuffleExchangeExec],
    +      target: Seq[ShuffleExchangeExec]): Boolean = {
    +    source.length == target.length &&
    +      source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator))
    +  }
    +
    +  type CoordinatorSignature = (Int, Long, Option[Int])
    +
    +  def apply(plan: SparkPlan): SparkPlan = {
    +    if (!conf.exchangeReuseEnabled) {
    --- End diff --
    
    I replied on that JIRA ticket. I suspect if the reported issue is still in 2.2.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    cc @cloud-fan for review.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    **[Test build #83894 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83894/testReport)** for PR 19756 at commit [`069699a`](https://github.com/apache/spark/commit/069699af84e2ffc133a7cfa56bc682e0f62f93a8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    ping @cloud-fan 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19756: [SPARK-22527][SQL] Reuse coordinated exchanges if possib...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/19756
  
    Do you mean we manually run `ReuseExchange` after the registration of exchanges?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org