You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2018/06/05 10:14:01 UTC

[GitHub] spark pull request #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Un...

GitHub user viirya opened a pull request:

    https://github.com/apache/spark/pull/21498

    [SPARK-24410][SQL][Core][WIP] Optimization for Union outputPartitioning

    ## What changes were proposed in this pull request?
    
    Currently `Union` has only unknown output partitioning. That said if the children are bucketed tables, we still run shuffling on the union result if going to run aggregation on it.
    
    This patch tries to better decide output partitioning for `Union` operator.
    
    This patch adds a private API `zipPartitions` to `RDD`. Since `zipPartitions` asks a function to run on elements of rdds, it only supports fixed number of rdds. But for `Union`, the number of children is not fixed.
    
    ## How was this patch tested?
    
    TBD.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/viirya/spark-1 SPARK-24410

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21498.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21498
    
----
commit 9f25bd19c4802a59039cef6f006f6c6e0802e01d
Author: Liang-Chi Hsieh <vi...@...>
Date:   2018-06-05T10:05:27Z

    Optimization for Union outputPartitioning.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    > In theory, this should be done in a cost-based style. Changing the way how union combines data will reduce the parallelism.
    > For example, if we union 2 tables each has 5 partitions. Without this PR we will launch 10 tasks to process the data, and locality should be easy to satisfy. After this PR, we only launch 5 tasks, and locality is hard to meet, we may have extra data transfer.
    
    Yes, the added `ZippedPartitionsRDD` for zipping RDDs works similar to `PartitionerAwareUnionRDD`, the preferred location for each partition will be the most common preferred location for zipped partitions.
    
    If we can have a solution which can be smarter so that we can make better choice between shuffle and locality/parallelism.
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21498: [SPARK-24410][SQL][Core] Optimization for Union o...

Posted by viirya <gi...@git.apache.org>.
Github user viirya closed the pull request at:

    https://github.com/apache/spark/pull/21498


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91493 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91493/testReport)** for PR 21498 at commit [`b058f89`](https://github.com/apache/spark/commit/b058f892af8204cb25a8daa1f1cd1a6de21c5fd6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    @mgaido91 WDYT? Does the benchmark make sense to you?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3812/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3818/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91495 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91495/testReport)** for PR 21498 at commit [`0dedf44`](https://github.com/apache/spark/commit/0dedf44559fb6da11c5d903c51bb73f5f508fe6f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    When the condition is satisfied and we know children of Union have same partitioning, this goes to let the first partition of union result includes first partitions of children RDDs, and 2nd, 3rd partitition...
    
    I'm not sure what part the data transfer might happen?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3819/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91482/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3821/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Benchmarking on a Spark cluster with 5 nodes on EC2 too.
    
    ```scala
    def benchmark(func: () => Unit): Unit = {
        val t0 = System.nanoTime()
        func()
        val t1 = System.nanoTime()
        println("Elapsed time: " + (t1 - t0) + "ns")
    }
    
    val N = 10000
    
    val data = (0 until N).map { i =>
      (i, i % 2, i % 3, Array.fill(10)(i), Array.fill(10)(i.toString), Array.fill(10)(i.toDouble), (i, i.toString, i.toDouble))
    }
    
    val df1 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5", "t6").repartition($"key")
    val df2 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5", "t6").repartition($"key")
    
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    spark.conf.set("spark.sql.unionInSamePartition", "true")
    
    val df3 = df1.union(df2).groupBy("key").agg(count("*"))
    val df4 = df1.union(df2)
    val df5 = df3.sample(0.8).filter($"key" > 1000000).sample(0.4)
    val df6 = df4.sample(0.8).filter($"key" > 1000000).sample(0.4)
    
    benchmark(() => df3.collect)
    benchmark(() => df4.collect)
    benchmark(() => df5.collect)
    benchmark(() => df6.collect)
    ```
    
    Before:
    ```scala
    scala> benchmark(() => df3.collect)
    Elapsed time: 663668585ns
    scala> benchmark(() => df4.collect)
    Elapsed time: 547487953ns
    scala> benchmark(() => df5.collect)
    Elapsed time: 712634187ns
    scala> benchmark(() => df6.collect)
    Elapsed time: 491917400ns
    ```
    
    After:
    ```scala
    scala> benchmark(() => df3.collect)
    Elapsed time: 516797788ns
    scala> benchmark(() => df4.collect)
    Elapsed time: 557499803ns
    scala> benchmark(() => df5.collect)
    Elapsed time: 611327782ns
    scala> benchmark(() => df6.collect)
    Elapsed time: 495387557ns
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91493 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91493/testReport)** for PR 21498 at commit [`b058f89`](https://github.com/apache/spark/commit/b058f892af8204cb25a8daa1f1cd1a6de21c5fd6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Tests are added. cc @kiszk @mgaido91 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91496/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    I'd like to close this for now. Wait for necessary change on statistics.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Thanks for you benchmark @viirya. The performance improvement is sensible. And seems no performance regression in the other case. Can we have a similar benchmark also with records with more complex schema? Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Un...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21498#discussion_r193263354
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1099,6 +1099,17 @@ object SQLConf {
           .intConf
           .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
     
    +  val UNION_IN_SAME_PARTITION =
    +    buildConf("spark.sql.unionInSamePartition")
    +      .internal()
    +      .doc("When true, Union operator will union children results in the same corresponding " +
    +        "partitions if they have same partitioning. This eliminates unnecessary shuffle in later " +
    +        "operators like aggregation. Note that because non-deterministic functions such as " +
    +        "monotonically_increasing_id are depended on partition id. By doing this, the values of " +
    --- End diff --
    
    Seems we have wanted to make sure non-deterministic functions have same values after union. Once we union children in same partitions, the values of such functions can be changed. So I added this config to control it. Default config is false.
    
    
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91497 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91497/testReport)** for PR 21498 at commit [`69a7066`](https://github.com/apache/spark/commit/69a70662c5d668ecaed3c8c5f0ecc53f33ab0682).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91482 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91482/testReport)** for PR 21498 at commit [`9f25bd1`](https://github.com/apache/spark/commit/9f25bd19c4802a59039cef6f006f6c6e0802e01d).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    cc @cloud-fan I don't add tests for now yet, but would like to see if this approach looks good for you first. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    I set up a Spark cluster with 5 nodes on EC2.
    
    ```scala
    def benchmark(func: () => Unit): Unit = {
        val t0 = System.nanoTime()
        func()
        val t1 = System.nanoTime()
        println("Elapsed time: " + (t1 - t0) + "ns")
    }
    
    val N = 100000000L
    
    spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as t2").repartition(col("key")).write.mode("overwrite").bucketBy(3, "key").sortBy("t1").saveAsTable("a1")
    spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as t2").repartition(col("key")).write.mode("overwrite").bucketBy(3, "key").sortBy("t1").saveAsTable("a2")
    
    spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
    
    val df = sql("select key,count(*) from (select * from a1 union all select * from a2)z group by key")
    val df2 = sql("select * from a1 union all select * from a2")
    
    val df3 = df.sample(0.8).filter($"key" > 1000000).sample(0.4)
    val df4 = df2.sample(0.8).filter($"key" > 1000000).sample(0.4)
    
    benchmark(() => df.collect)
    benchmark(() => df2.collect)
    benchmark(() => df3.collect)
    benchmark(() => df4.collect)
    ```
    
    Before:
    ```
    scala> benchmark(() => df.collect)
    Elapsed time: 371007018ns
    scala> benchmark(() => df2.collect)
    Elapsed time: 93056619ns
    scala> benchmark(() => df3.collect)
    Elapsed time: 477603242ns
    scala> benchmark(() => df4.collect)
    Elapsed time: 150106354ns
    ```
    
    After:
    ```
    scala> benchmark(() => df.collect)
    Elapsed time: 101199791ns
    scala> benchmark(() => df2.collect)
    Elapsed time: 80275158ns
    scala> benchmark(() => df3.collect)
    Elapsed time: 292775244ns
    scala> benchmark(() => df4.collect)
    Elapsed time: 151129518ns
    ```
    
    It improves the queries of `df` and `df3` by eliminating shuffle. For `df2` and `df4` which don't involve shuffle, there is no performance regression.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91482 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91482/testReport)** for PR 21498 at commit [`9f25bd1`](https://github.com/apache/spark/commit/9f25bd19c4802a59039cef6f006f6c6e0802e01d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91496 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91496/testReport)** for PR 21498 at commit [`6f487c9`](https://github.com/apache/spark/commit/6f487c9a9288176b7fcc574021abff6ae403895b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    @viirya  sorry, I somehow lost your updated benchmark. Yes, it makes sense. In the case without any shuffle needed after the union we have about a 2% performance regression. I am not sure about the reliability of the tests with `sample` as they may return a different number of rows IIUC. Can we remove the two sample operations and leave just the filter?
    
    Moreover, I think it would be also interesting to understand how much time is spent in collecting for instance. Because if, for instance, the time to collect the data to the driver is very high, that the performance regression would be much higher in percentage. Though I am not sure how to estimate it properly honestly. Do you have any idea about this?
    
    @cloud-fan @kiszk  what do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91493/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91496 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91496/testReport)** for PR 21498 at commit [`6f487c9`](https://github.com/apache/spark/commit/6f487c9a9288176b7fcc574021abff6ae403895b).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91495 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91495/testReport)** for PR 21498 at commit [`0dedf44`](https://github.com/apache/spark/commit/0dedf44559fb6da11c5d903c51bb73f5f508fe6f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    **[Test build #91497 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91497/testReport)** for PR 21498 at commit [`69a7066`](https://github.com/apache/spark/commit/69a70662c5d668ecaed3c8c5f0ecc53f33ab0682).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    @cloud-fan This is removed of WIP and can be review now. Please take a look when you are available, as supposed that you'll be busy in this week. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    > In aggregation we are replacing a needed shuffle with gathering only the needed rows from the other partitions.
    
    I don't know what this means actually. If we decided we don't need a shuffle because the partitioning satisfies the need, I'm not sure why we still need to gather rows from other partitions. I think it is simple, if we need rows from other partitions, we do shuffle, if not, we avoid shuffle.
    
    But I think this is the point we have different understanding. So as you said, it is better to hear others opinion.
    
    > Probably we can wait for others' opinion, but it would be also great to have some performance tests on both cases and different scenarios in order to better evaluate this change. What do you think?
    
    Yeah, I think so. I will have some tests.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    > Because they have same partitioning, for example, I suppose that first partitions of all RDDs are located at the same place?
    
    I really don't think so.
    
    In aggregation we are replacing a needed shuffle with gathering only the needed rows from the other partitions. Here we are _always_ gathering the needed rows for maintaining the partitioning in order to avoid a _possible_  shuffle which may occur later. I agree that in such a situation this is an improvement, but in case a shuffle is not needed after the union I think we can have a performance regression.
    
    Probably we can wait for others' opinion, but it would be also great to have some performance tests on both cases and different scenarios in order to better evaluate this change. What do you think?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    @viirya I may be wrong, but I am not sure about the performance improvement brought by this. The goal here is to avoid a shuffle after the `union` operator (when it is followed by operators requiring shuffles). But this is actually causing the transfer of all the data (but one RDD) over the network, as it collapses all the partitions with the same distribution to the same one and it does this also when it is not needed, ie. when a shuffle is not required after. In this case we might have a performance regression. Am I missing something?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    @mgaido91 Ok. I will try to have another one.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    In theory, this should be done in a cost-based style. Changing the way how union combines data will reduce the parallelism.
    
    For example, if we union 2 tables each has 5 partitions. Without this PR we will launch 10 tasks to process the data, and locality should be easy to satisfy. After this PR, we only launch 5 tasks, and locality is hard to meet, we may have extra data transfer.
    
    We should move statistics to physical plan first. cc @wzhfy 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3820/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21498: [SPARK-24410][SQL][Core] Optimization for Union o...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21498#discussion_r193618338
  
    --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
    @@ -1099,6 +1099,17 @@ object SQLConf {
           .intConf
           .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
     
    +  val UNION_IN_SAME_PARTITION =
    +    buildConf("spark.sql.unionInSamePartition")
    +      .internal()
    +      .doc("When true, Union operator will union children results in the same corresponding " +
    +        "partitions if they have same partitioning. This eliminates unnecessary shuffle in later " +
    +        "operators like aggregation. Note that because non-deterministic functions such as " +
    +        "monotonically_increasing_id are depended on partition id. By doing this, the values of " +
    --- End diff --
    
    I'm a bit not convinced by the reason and behavior of keeping the value of non-deterministic functions after an union. Like in the following queries:
    
    ```scala
    val df1 = spark.range(10).select(monotonically_increasing_id())
    val df2 = spark.range(10).select(monotonically_increasing_id())
    val union = df1.union(df2)
    ```
    
    Now we keep the values of `monotonically_increasing_id` returned by `df1`, `df2` and `union` are the same. However, as non-deterministic functions, the values changing by data layout/sequence sounds still reasonable.
    
    Anyway that is current behavior and this config need users to enable this feature explicitly.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91495/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21498
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91497/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org