You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by viirya <gi...@git.apache.org> on 2018/06/05 10:14:01 UTC
[GitHub] spark pull request #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Un...
GitHub user viirya opened a pull request:
https://github.com/apache/spark/pull/21498
[SPARK-24410][SQL][Core][WIP] Optimization for Union outputPartitioning
## What changes were proposed in this pull request?
Currently `Union` has only unknown output partitioning. That said if the children are bucketed tables, we still run shuffling on the union result if going to run aggregation on it.
This patch tries to better decide output partitioning for `Union` operator.
This patch adds a private API `zipPartitions` to `RDD`. Since `zipPartitions` asks a function to run on elements of rdds, it only supports fixed number of rdds. But for `Union`, the number of children is not fixed.
## How was this patch tested?
TBD.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/viirya/spark-1 SPARK-24410
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/21498.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #21498
----
commit 9f25bd19c4802a59039cef6f006f6c6e0802e01d
Author: Liang-Chi Hsieh <vi...@...>
Date: 2018-06-05T10:05:27Z
Optimization for Union outputPartitioning.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
> In theory, this should be done in a cost-based style. Changing the way how union combines data will reduce the parallelism.
> For example, if we union 2 tables each has 5 partitions. Without this PR we will launch 10 tasks to process the data, and locality should be easy to satisfy. After this PR, we only launch 5 tasks, and locality is hard to meet, we may have extra data transfer.
Yes, the added `ZippedPartitionsRDD` for zipping RDDs works similar to `PartitionerAwareUnionRDD`, the preferred location for each partition will be the most common preferred location for zipped partitions.
If we can have a solution which can be smarter so that we can make better choice between shuffle and locality/parallelism.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21498: [SPARK-24410][SQL][Core] Optimization for Union o...
Posted by viirya <gi...@git.apache.org>.
Github user viirya closed the pull request at:
https://github.com/apache/spark/pull/21498
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91493 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91493/testReport)** for PR 21498 at commit [`b058f89`](https://github.com/apache/spark/commit/b058f892af8204cb25a8daa1f1cd1a6de21c5fd6).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
@mgaido91 WDYT? Does the benchmark make sense to you?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3812/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3818/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91495 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91495/testReport)** for PR 21498 at commit [`0dedf44`](https://github.com/apache/spark/commit/0dedf44559fb6da11c5d903c51bb73f5f508fe6f).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
When the condition is satisfied and we know children of Union have same partitioning, this goes to let the first partition of union result includes first partitions of children RDDs, and 2nd, 3rd partitition...
I'm not sure what part the data transfer might happen?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3819/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91482/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3821/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
Benchmarking on a Spark cluster with 5 nodes on EC2 too.
```scala
def benchmark(func: () => Unit): Unit = {
val t0 = System.nanoTime()
func()
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
}
val N = 10000
val data = (0 until N).map { i =>
(i, i % 2, i % 3, Array.fill(10)(i), Array.fill(10)(i.toString), Array.fill(10)(i.toDouble), (i, i.toString, i.toDouble))
}
val df1 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5", "t6").repartition($"key")
val df2 = sc.parallelize(data).toDF("key", "t1", "t2", "t3", "t4", "t5", "t6").repartition($"key")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.unionInSamePartition", "true")
val df3 = df1.union(df2).groupBy("key").agg(count("*"))
val df4 = df1.union(df2)
val df5 = df3.sample(0.8).filter($"key" > 1000000).sample(0.4)
val df6 = df4.sample(0.8).filter($"key" > 1000000).sample(0.4)
benchmark(() => df3.collect)
benchmark(() => df4.collect)
benchmark(() => df5.collect)
benchmark(() => df6.collect)
```
Before:
```scala
scala> benchmark(() => df3.collect)
Elapsed time: 663668585ns
scala> benchmark(() => df4.collect)
Elapsed time: 547487953ns
scala> benchmark(() => df5.collect)
Elapsed time: 712634187ns
scala> benchmark(() => df6.collect)
Elapsed time: 491917400ns
```
After:
```scala
scala> benchmark(() => df3.collect)
Elapsed time: 516797788ns
scala> benchmark(() => df4.collect)
Elapsed time: 557499803ns
scala> benchmark(() => df5.collect)
Elapsed time: 611327782ns
scala> benchmark(() => df6.collect)
Elapsed time: 495387557ns
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91493 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91493/testReport)** for PR 21498 at commit [`b058f89`](https://github.com/apache/spark/commit/b058f892af8204cb25a8daa1f1cd1a6de21c5fd6).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
Tests are added. cc @kiszk @mgaido91
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91496/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
I'd like to close this for now. Wait for necessary change on statistics.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:
https://github.com/apache/spark/pull/21498
Thanks for you benchmark @viirya. The performance improvement is sensible. And seems no performance regression in the other case. Can we have a similar benchmark also with records with more complex schema? Thanks.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Un...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/21498#discussion_r193263354
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1099,6 +1099,17 @@ object SQLConf {
.intConf
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
+ val UNION_IN_SAME_PARTITION =
+ buildConf("spark.sql.unionInSamePartition")
+ .internal()
+ .doc("When true, Union operator will union children results in the same corresponding " +
+ "partitions if they have same partitioning. This eliminates unnecessary shuffle in later " +
+ "operators like aggregation. Note that because non-deterministic functions such as " +
+ "monotonically_increasing_id are depended on partition id. By doing this, the values of " +
--- End diff --
Seems we have wanted to make sure non-deterministic functions have same values after union. Once we union children in same partitions, the values of such functions can be changed. So I added this config to control it. Default config is false.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91497 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91497/testReport)** for PR 21498 at commit [`69a7066`](https://github.com/apache/spark/commit/69a70662c5d668ecaed3c8c5f0ecc53f33ab0682).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91482 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91482/testReport)** for PR 21498 at commit [`9f25bd1`](https://github.com/apache/spark/commit/9f25bd19c4802a59039cef6f006f6c6e0802e01d).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
cc @cloud-fan I don't add tests for now yet, but would like to see if this approach looks good for you first. Thanks.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
I set up a Spark cluster with 5 nodes on EC2.
```scala
def benchmark(func: () => Unit): Unit = {
val t0 = System.nanoTime()
func()
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) + "ns")
}
val N = 100000000L
spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as t2").repartition(col("key")).write.mode("overwrite").bucketBy(3, "key").sortBy("t1").saveAsTable("a1")
spark.range(N).selectExpr("id as key", "id % 2 as t1", "id % 3 as t2").repartition(col("key")).write.mode("overwrite").bucketBy(3, "key").sortBy("t1").saveAsTable("a2")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
val df = sql("select key,count(*) from (select * from a1 union all select * from a2)z group by key")
val df2 = sql("select * from a1 union all select * from a2")
val df3 = df.sample(0.8).filter($"key" > 1000000).sample(0.4)
val df4 = df2.sample(0.8).filter($"key" > 1000000).sample(0.4)
benchmark(() => df.collect)
benchmark(() => df2.collect)
benchmark(() => df3.collect)
benchmark(() => df4.collect)
```
Before:
```
scala> benchmark(() => df.collect)
Elapsed time: 371007018ns
scala> benchmark(() => df2.collect)
Elapsed time: 93056619ns
scala> benchmark(() => df3.collect)
Elapsed time: 477603242ns
scala> benchmark(() => df4.collect)
Elapsed time: 150106354ns
```
After:
```
scala> benchmark(() => df.collect)
Elapsed time: 101199791ns
scala> benchmark(() => df2.collect)
Elapsed time: 80275158ns
scala> benchmark(() => df3.collect)
Elapsed time: 292775244ns
scala> benchmark(() => df4.collect)
Elapsed time: 151129518ns
```
It improves the queries of `df` and `df3` by eliminating shuffle. For `df2` and `df4` which don't involve shuffle, there is no performance regression.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91482 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91482/testReport)** for PR 21498 at commit [`9f25bd1`](https://github.com/apache/spark/commit/9f25bd19c4802a59039cef6f006f6c6e0802e01d).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91496 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91496/testReport)** for PR 21498 at commit [`6f487c9`](https://github.com/apache/spark/commit/6f487c9a9288176b7fcc574021abff6ae403895b).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:
https://github.com/apache/spark/pull/21498
@viirya sorry, I somehow lost your updated benchmark. Yes, it makes sense. In the case without any shuffle needed after the union we have about a 2% performance regression. I am not sure about the reliability of the tests with `sample` as they may return a different number of rows IIUC. Can we remove the two sample operations and leave just the filter?
Moreover, I think it would be also interesting to understand how much time is spent in collecting for instance. Because if, for instance, the time to collect the data to the driver is very high, that the performance regression would be much higher in percentage. Though I am not sure how to estimate it properly honestly. Do you have any idea about this?
@cloud-fan @kiszk what do you think?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91493/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91496 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91496/testReport)** for PR 21498 at commit [`6f487c9`](https://github.com/apache/spark/commit/6f487c9a9288176b7fcc574021abff6ae403895b).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91495 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91495/testReport)** for PR 21498 at commit [`0dedf44`](https://github.com/apache/spark/commit/0dedf44559fb6da11c5d903c51bb73f5f508fe6f).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/21498
**[Test build #91497 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91497/testReport)** for PR 21498 at commit [`69a7066`](https://github.com/apache/spark/commit/69a70662c5d668ecaed3c8c5f0ecc53f33ab0682).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
@cloud-fan This is removed of WIP and can be review now. Please take a look when you are available, as supposed that you'll be busy in this week. Thanks.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
> In aggregation we are replacing a needed shuffle with gathering only the needed rows from the other partitions.
I don't know what this means actually. If we decided we don't need a shuffle because the partitioning satisfies the need, I'm not sure why we still need to gather rows from other partitions. I think it is simple, if we need rows from other partitions, we do shuffle, if not, we avoid shuffle.
But I think this is the point we have different understanding. So as you said, it is better to hear others opinion.
> Probably we can wait for others' opinion, but it would be also great to have some performance tests on both cases and different scenarios in order to better evaluate this change. What do you think?
Yeah, I think so. I will have some tests.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:
https://github.com/apache/spark/pull/21498
> Because they have same partitioning, for example, I suppose that first partitions of all RDDs are located at the same place?
I really don't think so.
In aggregation we are replacing a needed shuffle with gathering only the needed rows from the other partitions. Here we are _always_ gathering the needed rows for maintaining the partitioning in order to avoid a _possible_ shuffle which may occur later. I agree that in such a situation this is an improvement, but in case a shuffle is not needed after the union I think we can have a performance regression.
Probably we can wait for others' opinion, but it would be also great to have some performance tests on both cases and different scenarios in order to better evaluate this change. What do you think?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by mgaido91 <gi...@git.apache.org>.
Github user mgaido91 commented on the issue:
https://github.com/apache/spark/pull/21498
@viirya I may be wrong, but I am not sure about the performance improvement brought by this. The goal here is to avoid a shuffle after the `union` operator (when it is followed by operators requiring shuffles). But this is actually causing the transfer of all the data (but one RDD) over the network, as it collapses all the partitions with the same distribution to the same one and it does this also when it is not needed, ie. when a shuffle is not required after. In this case we might have a performance regression. Am I missing something?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:
https://github.com/apache/spark/pull/21498
@mgaido91 Ok. I will try to have another one.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:
https://github.com/apache/spark/pull/21498
In theory, this should be done in a cost-based style. Changing the way how union combines data will reduce the parallelism.
For example, if we union 2 tables each has 5 partitions. Without this PR we will launch 10 tasks to process the data, and locality should be easy to satisfy. After this PR, we only launch 5 tasks, and locality is hard to meet, we may have extra data transfer.
We should move statistics to physical plan first. cc @wzhfy
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core][WIP] Optimization for Union out...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3820/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #21498: [SPARK-24410][SQL][Core] Optimization for Union o...
Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/21498#discussion_r193618338
--- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1099,6 +1099,17 @@ object SQLConf {
.intConf
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
+ val UNION_IN_SAME_PARTITION =
+ buildConf("spark.sql.unionInSamePartition")
+ .internal()
+ .doc("When true, Union operator will union children results in the same corresponding " +
+ "partitions if they have same partitioning. This eliminates unnecessary shuffle in later " +
+ "operators like aggregation. Note that because non-deterministic functions such as " +
+ "monotonically_increasing_id are depended on partition id. By doing this, the values of " +
--- End diff --
I'm a bit not convinced by the reason and behavior of keeping the value of non-deterministic functions after an union. Like in the following queries:
```scala
val df1 = spark.range(10).select(monotonically_increasing_id())
val df2 = spark.range(10).select(monotonically_increasing_id())
val union = df1.union(df2)
```
Now we keep the values of `monotonically_increasing_id` returned by `df1`, `df2` and `union` are the same. However, as non-deterministic functions, the values changing by data layout/sequence sounds still reasonable.
Anyway that is current behavior and this config need users to enable this feature explicitly.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91495/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #21498: [SPARK-24410][SQL][Core] Optimization for Union outputPa...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/21498
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91497/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org