You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/09/18 04:29:30 UTC

[GitHub] [spark] manuzhang opened a new pull request #29797: [SPARK-32932][SQL] Do not change number of partitions on RepartitionByExpression when coalescing disabled

manuzhang opened a new pull request #29797:
URL: https://github.com/apache/spark/pull/29797


   ### What changes were proposed in this pull request?
   Do not change number of partitions on RepartitionByExpression when `spark.sql.adaptive.coalescePartitions.enabled=false`.
   
   ### Why are the changes needed?
   Users usually repartition with partition column on dynamic partition overwrite. AQE could break it when changing number of partitions in coalescing shuffle partitions or using local shuffle reader. That could lead to a large number of output files, even exceeding the file system limit.  A simple fix is to allow users to disable it by setting coalescingPartitions to false.
   
   ### Does this PR introduce _any_ user-facing change?
   Yes.
   
   
   ### How was this patch tested?
   Add test.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694672668






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not change number of partitions on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694645141






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698906945






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r503329978



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +104,16 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  @transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =

Review comment:
       it's only called once, can be a `def`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r500796701



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +103,14 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  @transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
+    context.qe.sparkPlan match {
+      case _: DataWritingCommandExec =>

Review comment:
       See `DataSourceV2Suite`, we have testing v2 source that support writing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708787541






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r497926082



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +103,14 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  @transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
+    context.qe.sparkPlan match {
+      case _: DataWritingCommandExec =>

Review comment:
       It seems DSv2 is not ready for write as per https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L988-L994.
   
   Meanwhile, will it too big a change for those interfaces to extend the tagging trait ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708787541






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
maryannxue commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-726818413


   I assume in this case, the repartition node has been optimized out. Then still this fix won't cover just the repartition case without a writer, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-696000247


   > the small files issue due to removed repartition
   
   Are you talking about a different issue? IIUC the repartition (shuffle) is still there and the only problem is local shuffle reader.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708870407


   **[Test build #129767 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129767/testReport)** for PR 29797 at commit [`d391269`](https://github.com/apache/spark/commit/d3912696b1ae53efaf4b154236d6b3cc046768d6).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698907659


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/129110/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-695993545


   @cloud-fan 
   
   > we give local shuffle reader expected parallelism
   
   Could you please elaborate on this ? How will it solve the small files issue due to removed repartition when writing to dynamic partition or bucket table ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-696667483


   I mean the physical shuffle doesn't happen so that each shuffle task will generate at most `numReducers` files. The overall number will be `numMappers * numReducers`. 
   
   If we add a check, I'm not sure whether the local shuffle reader will ever be applied in practice.  In our use cases, the target bucket tables usually have more than 1000 buckets. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r491281977



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -705,7 +705,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         exchange.ShuffleExchangeExec(
           r.partitioning,
           planLater(r.child),
-          noUserSpecifiedNumPartition = r.optNumPartitions.isEmpty) :: Nil
+          noUserSpecifiedNumPartition = conf.coalesceShufflePartitionsEnabled &&
+            r.optNumPartitions.isEmpty) :: Nil

Review comment:
       @viirya Thanks for the good questions.
   
   This solution is not ideal till we find a way not to apply local shuffle reader if the partitioning doesn't match that of dynamic partition. It's not ideal either that any AQE config is global. 
   
   With `coalesceShufflePartitionsEnabled` and `localShuffleReaderEnabled`, the output partitioning of shuffle is uncertain which arguably contradicts the purpose of `RepartitionByExpression`.
   
   >  If the user needs to coalesce shuffle partition in the query, but also needs dynamic partition overwrite?
   Another option is to introduce a new config specific for repartition, e.g. `spark.sql.adaptive.repartition.canChangeNumPartitions` 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-696670497


   OK I get your point. The file write node works better with certain partitioning and the local shuffle reader breaks it.
   
   Then how about we update `InsertAdaptiveSparkPlan`, and have a way to disable local shuffle reader if the root node is data writing node?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r500086626



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +103,14 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  @transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
+    context.qe.sparkPlan match {
+      case _: DataWritingCommandExec =>

Review comment:
       File source v2 is not ready yet, but it doesn't mean DS v2 is not ready for writing. Please follow `InsertAdaptiveSparkPlan.applyInternal`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708763751


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34373/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698906945


   **[Test build #129110 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129110/testReport)** for PR 29797 at commit [`7e0d766`](https://github.com/apache/spark/commit/7e0d766b424cdcac27f4bb3b08e325886daf92b2).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698907651


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-699053951


   **[Test build #129111 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129111/testReport)** for PR 29797 at commit [`84134b0`](https://github.com/apache/spark/commit/84134b09ef5295818a32d9dc4612141fe93fa05c).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694672668






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-707089778


   **[Test build #129691 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129691/testReport)** for PR 29797 at commit [`4fdee62`](https://github.com/apache/spark/commit/4fdee62f3def039012a23c071ff7a63fd3544722).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not change number of partitions on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694645141






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694696992


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/128851/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r502289280



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +103,14 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  @transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
+    context.qe.sparkPlan match {
+      case _: DataWritingCommandExec =>

Review comment:
       I've added match for `V2TableWriteExec` and UT. Not sure I've got it right as v2 API is quite new to me. Please help review. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706172500


   **[Test build #129580 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129580/testReport)** for PR 29797 at commit [`b112133`](https://github.com/apache/spark/commit/b112133dfc671ca553a879994dba830a3c64b583).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708871074






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-695993545


   @cloud-fan 
   
   > we give local shuffle reader expected parallelism
   
   Could you please elaborate on this ? How will it solve the small files issue due to removed repartition when writing to dynamic partition or bucket table ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706088530


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34185/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706079697


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34185/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698905857


   @cloud-fan Redo the PR as you suggested. Please help review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698944562






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r503138346



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1263,57 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j")
+        .repartition($"j")

Review comment:
       nit: `val df = data.toDF("i", "j").repartition($"j")`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r490732936



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -705,7 +705,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         exchange.ShuffleExchangeExec(
           r.partitioning,
           planLater(r.child),
-          noUserSpecifiedNumPartition = r.optNumPartitions.isEmpty) :: Nil
+          noUserSpecifiedNumPartition = conf.coalesceShufflePartitionsEnabled &&
+            r.optNumPartitions.isEmpty) :: Nil

Review comment:
       Two questions so far.
   
   1. Seems to me `RepartitionByExpression` here is not alwasys for dynamic partition overwrite case. By this change, all `RepartitionByExpression` are affected by `coalesceShufflePartitionsEnabled` config.
   
   2. So the users of dynamic partition overwrite need to set `conf.coalesceShufflePartitionsEnabled` false to avoid that? If the user needs to coalesce shuffle partition in the query, but also needs dynamic partition overwrite?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-727768565


   I'm reading the classdoc of `OptimizeLocalShuffleReader`, and I do feel the design is a bit hacky. We add local shuffle reader if
   1. the shuffle is the root node of a query stage.
   2. the shuffle is BHJ build side.
   
   The reason for condition 1 is it will never introduce shuffle. This is true, but this may change the final output partitioning which may be bad for cases like write command.
   
   I like the idea from @maryannxue which is more general: 1) move LSR rule into postStageCreationRules; and 2) make the LSR rule match an Exchange first (so condition 1 becomes: the shuffle is a direct child of an exchange). By doing this we can skip LSR rule in the last stage, as the last stage's root node is not exchange.
   
   I'm not very sure why the current approach can add LSR to BHJ probe side. This seems like an accident to me as it's not mentioned in the classdoc.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-726512021


   Can you highlight the problem? I do see `CustomShuffleReader local` in all the build sides.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698907651






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r503140497



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1263,57 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j")
+        .repartition($"j")
+
+      withTable("t") {
+        df.write
+          .partitionBy("j")
+          .saveAsTable("t")
+        assert(spark.read.table("t").inputFiles.length == 3)

Review comment:
       it's a bit tricky to check the number of files. 3 distinct values don't always mean 3 files.
   
   Can we use `QueryExecutionListener` to catch the query plan, and check there is no local shuffle reader?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r504588984



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1264,67 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalReader: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
+              noLocalReader = collect(plan) {
+                case exec: CustomShuffleReaderExec if exec.isLocalReader => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(funcName: String, qe: QueryExecution,
+          exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalReader)
+        noLocalReader = false
+      }
+
+      // Test DataSource v2
+      withTempPath { f =>
+        val path = f.getCanonicalPath
+        val format = classOf[V2Source].getName

Review comment:
       Using `NoopDataSource` will fail 
   
   ```
   java.lang.IllegalArgumentException: requirement failed: The provided partitioning does not match of the table.
    - provided: j
    - table: 
   	at scala.Predef$.require(Predef.scala:281)
   	at org.apache.spark.sql.DataFrameWriter.checkPartitioningMatchesV2Table(DataFrameWriter.scala:772)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706060441


   **[Test build #129580 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129580/testReport)** for PR 29797 at commit [`b112133`](https://github.com/apache/spark/commit/b112133dfc671ca553a879994dba830a3c64b583).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue edited a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
maryannxue edited a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-730451129


   @manuzhang 
   > Coalescing won't be applied to repartition if user specifies a repartition number.
   
   It does apply when the specified number happen to be the default partition number, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-729628843


   @maryannxue thanks for the thorough explanation. Let me rework this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706173430






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706088546


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698914035


   **[Test build #129111 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129111/testReport)** for PR 29797 at commit [`84134b0`](https://github.com/apache/spark/commit/84134b09ef5295818a32d9dc4612141fe93fa05c).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r503138597



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1263,57 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j")
+        .repartition($"j")
+
+      withTable("t") {
+        df.write
+          .partitionBy("j")
+          .saveAsTable("t")

Review comment:
       ditto: put in one line




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-697039400


   > Then how about we update `InsertAdaptiveSparkPlan`, and have a way to disable local shuffle reader if the root node is data writing node?
   
   That will also disable local shuffle reader for broadcast join. How about adding a variable `isDataWritingCommand` in the context and bypassing local shuffle reader for shuffle exchange if that's true ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang edited a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang edited a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-726493708


   @maryannxue This could miss a LSR optimization on ~~build~~ probe side of leaf BHJ in multiple joins as follows.
   
   ```
   *(6) BroadcastHashJoin [b#24], [a#33], Inner, BuildLeft, false
   :- BroadcastQueryStage 7
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[3, int, false] as bigint)),false), [id=#390]
   :     +- CustomShuffleReader local
   :        +- ShuffleQueryStage 6
   :           +- Exchange hashpartitioning(b#24, 5), true, [id=#364]
   :              +- *(5) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
   :                 :- BroadcastQueryStage 4
   :                 :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#264]
   :                 :     +- CustomShuffleReader local
   :                 :        +- ShuffleQueryStage 0
   :                 :           +- Exchange hashpartitioning(key#13, 5), true, [id=#172]
   :                 :              +- *(1) Filter (isnotnull(value#14) AND (cast(value#14 as int) = 1))
   :                 :                 +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :                 :                    +- Scan[obj#12]
   :                 +- ShuffleQueryStage 1
   :                    +- Exchange hashpartitioning(a#23, 5), true, [id=#179]
   :                       +- *(2) Filter (b#24 = 1)
   :                          +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
   :                             +- Scan[obj#22]
   +- *(6) BroadcastHashJoin [n#93], [a#33], Inner, BuildRight, false
      :- CustomShuffleReader local
      :  +- ShuffleQueryStage 2
      :     +- Exchange hashpartitioning(n#93, 5), true, [id=#196]
      :        +- *(3) Filter (n#93 = 1)
      :           +- *(3) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).l, true, false) AS l#94]
      :              +- Scan[obj#92]
      +- BroadcastQueryStage 5
         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#310]
            +- CustomShuffleReader local
               +- ShuffleQueryStage 3
                  +- Exchange hashpartitioning(a#33, 5), true, [id=#210]
                     +- *(4) Filter (a#33 = 1)
                        +- *(4) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData3, true])).a AS a#33, unwrapoption(IntegerType, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData3, true])).b) AS b#34]
                           +- Scan[obj#32]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-696667483






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694644786






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r491281977



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -705,7 +705,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         exchange.ShuffleExchangeExec(
           r.partitioning,
           planLater(r.child),
-          noUserSpecifiedNumPartition = r.optNumPartitions.isEmpty) :: Nil
+          noUserSpecifiedNumPartition = conf.coalesceShufflePartitionsEnabled &&
+            r.optNumPartitions.isEmpty) :: Nil

Review comment:
       @viirya 
   This solution is not ideal till we find a way not to apply local shuffle reader if the partitioning doesn't match that of dynamic partition. It's not ideal either that any AQE config is global. 
   
   With `coalesceShufflePartitionsEnabled` and `localShuffleReaderEnabled`, the output partitioning of shuffle is uncertain which arguably contradicts the purpose of `RepartitionByExpression`.
   
   >  If the user needs to coalesce shuffle partition in the query, but also needs dynamic partition overwrite?
   Another option is to introduce a new config specific for repartition, e.g. `spark.sql.adaptive.repartition.canChangeNumPartitions` 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694712789


   seems like we should be stricter to apply local shuffle reader, to make sure we can satisfy the expected parallelism.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
maryannxue commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-730150112


   Thank you, @manuzhang! I have a local fix already, and I'll submit a PR shortly. I assume that the coalescing rule has a similar issue where a repartition shuffle with a specific number is optimized out because of another shuffle introduced by join or agg, and later that shuffle gets coalesced, which means the specified repartition number is ignored.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-707235054






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r503140497



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1263,57 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j")
+        .repartition($"j")
+
+      withTable("t") {
+        df.write
+          .partitionBy("j")
+          .saveAsTable("t")
+        assert(spark.read.table("t").inputFiles.length == 3)

Review comment:
       it's a bit tricky to check the number of files. 3 distinct values don't always mean 3 files.
   
   Can we use `QueryExecutionListener` to catch the logical plan, and check there is no local shuffle reader?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-707089778


   **[Test build #129691 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129691/testReport)** for PR 29797 at commit [`4fdee62`](https://github.com/apache/spark/commit/4fdee62f3def039012a23c071ff7a63fd3544722).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706088546






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706088546






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang edited a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang edited a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-726541325


   @cloud-fan Sorry, it's probe side, `ShuffleQueryStage 1` which currently has a `CustomShuffleReader local` parent


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694696937






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not change number of partitions on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694644786


   **[Test build #128851 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128851/testReport)** for PR 29797 at commit [`fc831f6`](https://github.com/apache/spark/commit/fc831f67d4a52bd148328857cf4bac640434ff2b).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694696937






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708914985


   thanks, merging to master!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-699054908






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-707233900


   **[Test build #129691 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129691/testReport)** for PR 29797 at commit [`4fdee62`](https://github.com/apache/spark/commit/4fdee62f3def039012a23c071ff7a63fd3544722).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-699054908






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706060441


   **[Test build #129580 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129580/testReport)** for PR 29797 at commit [`b112133`](https://github.com/apache/spark/commit/b112133dfc671ca553a879994dba830a3c64b583).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-726541325


   @cloud-fan `ShuffleQueryStage 1` which currently has a `CustomShuffleReader local` parent


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r496740272



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +103,14 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  @transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
+    context.qe.sparkPlan match {
+      case _: DataWritingCommandExec =>

Review comment:
       we need to match all writing commands, including DS v1, v2 and file source. Maybe we can create a tagging trait like `UserDefinedExpression`, to tag these writing commands.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +103,14 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  @transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
+    context.qe.sparkPlan match {
+      case _: DataWritingCommandExec =>
+        queryStageOptimizerRules.filterNot(_.isInstanceOf[OptimizeLocalShuffleReader])

Review comment:
       let's add comments to explain it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694708691


   > AQE could break it by removing shuffle with local shuffle reader.
   
   Yea this is an issue, thanks for reporting! BTW why is it OK when coalesce partition is enabled?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r491281977



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -705,7 +705,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         exchange.ShuffleExchangeExec(
           r.partitioning,
           planLater(r.child),
-          noUserSpecifiedNumPartition = r.optNumPartitions.isEmpty) :: Nil
+          noUserSpecifiedNumPartition = conf.coalesceShufflePartitionsEnabled &&
+            r.optNumPartitions.isEmpty) :: Nil

Review comment:
       @viirya Thanks for the good questions.
   
   This solution is not ideal till we find a way not to apply local shuffle reader if the partitioning doesn't match that of dynamic partition. It's not ideal either that any AQE config is global. 
   
   With `coalesceShufflePartitionsEnabled` and `localShuffleReaderEnabled`, the output partitioning of shuffle is uncertain which arguably contradicts the purpose of `RepartitionByExpression`.
   
   >  If the user needs to coalesce shuffle partition in the query, but also needs dynamic partition overwrite?
   
   Another option is to introduce a new config specific for repartition, e.g. `spark.sql.adaptive.repartition.canChangeNumPartitions` 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698906945


   **[Test build #129110 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129110/testReport)** for PR 29797 at commit [`7e0d766`](https://github.com/apache/spark/commit/7e0d766b424cdcac27f4bb3b08e325886daf92b2).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r503259010



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1263,57 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j")
+        .repartition($"j")
+
+      withTable("t") {
+        df.write
+          .partitionBy("j")
+          .saveAsTable("t")
+        assert(spark.read.table("t").inputFiles.length == 3)

Review comment:
       Tests have been updated with `QueryExecutionListener` to check local shuffle reader. Please help review again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
maryannxue commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-726822028


   We have some fundamental problems hanging around in our optimizations. In general, when we optimize a Shuffle over another Shuffle that is semantically equivalent (almost equivalent in this case), we should get rid of the child shuffle instead of the parent one, yet it's a little hard to implement right now.
   But again, this is a "repartition" issue rather than the write command issue. Introducing this kind of hack in AQE plan does not make sense.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-727132781


   @maryannxue thanks for pointing out the fundamental problems while I'm not familiar with the design behind. When starting out, I was trying to solve the most urgent issue in our use cases, too many output files caused by LSR, which broke the downstream jobs on the pipeline. Meanwhile, disabling LSR entirely downgrades the performance of BHJ switched from SMJ. Hence, this PR with help of @cloud-fan. 
   
   > But again, this is a "repartition" issue rather than the write command issue.  Introducing this kind of hack in AQE plan does not make sense.
   1. Is that ok or even good if repartition is optimized away in cases other than on write command ?
   2. Moving forward, what's your suggestion ? Do you want me revert this PR ?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698944534


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/33732/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r504601341



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1264,67 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalReader: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
+              noLocalReader = collect(plan) {
+                case exec: CustomShuffleReaderExec if exec.isLocalReader => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(funcName: String, qe: QueryExecution,
+          exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalReader)
+        noLocalReader = false
+      }
+
+      // Test DataSource v2
+      withTempPath { f =>
+        val path = f.getCanonicalPath
+        val format = classOf[V2Source].getName

Review comment:
       partitioned or not doesn't matter, we can test `df.write.format(...).save()`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #29797:
URL: https://github.com/apache/spark/pull/29797


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708688153


   **[Test build #129767 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129767/testReport)** for PR 29797 at commit [`d391269`](https://github.com/apache/spark/commit/d3912696b1ae53efaf4b154236d6b3cc046768d6).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-696670497


   OK I get your point. The file write node works better with certain partitioning and the local shuffle reader breaks it.
   
   Then how about we update `InsertAdaptiveSparkPlan`, and have a way to disable local shuffle reader if the root node is data writing node?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-695999530


   See `OptimizeLocalShuffleReader.getPartitionSpecs`:
   ```
   val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
   ```
   
   If we skip local shuffle reader, we fix the small files issue.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-695965713


   I think the root cause is, we give local shuffle reader expected parallelism, but it's not always satisfied. For example, too many mappers can lead to over-parallelism and many small files.
   
   We should add a check and only apply local shuffle reader if expected parallelism can be satisfied.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706173430






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706060441






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r504729804



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +104,16 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  private def finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
+    context.qe.sparkPlan match {
+      case _: DataWritingCommandExec | _: V2TableWriteExec =>
+        // SPARK-32932: Local shuffle reader could break partitioning that works best
+        // for the following writing command
+       queryStageOptimizerRules.filterNot(_.isInstanceOf[OptimizeLocalShuffleReader])

Review comment:
       nit: indentation is one space off.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r502289280



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +103,14 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  @transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
+    context.qe.sparkPlan match {
+      case _: DataWritingCommandExec =>

Review comment:
       I've added match for `V2TableWriteExec` and UT. Not sure I've got it right as v2 API is quite new to me. Please help review. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698905857


   @cloud-fan Redo the PR as you suggested. Please help review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-707124846






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r490732936



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -705,7 +705,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         exchange.ShuffleExchangeExec(
           r.partitioning,
           planLater(r.child),
-          noUserSpecifiedNumPartition = r.optNumPartitions.isEmpty) :: Nil
+          noUserSpecifiedNumPartition = conf.coalesceShufflePartitionsEnabled &&
+            r.optNumPartitions.isEmpty) :: Nil

Review comment:
       Two questions so far.
   
   1. Seems to me `RepartitionByExpression` here is not always for dynamic partition overwrite case. By this change, all `RepartitionByExpression` are affected by `coalesceShufflePartitionsEnabled` config.
   
   2. So the users of dynamic partition overwrite need to set `conf.coalesceShufflePartitionsEnabled` false to avoid that? If the user needs to coalesce shuffle partition in the query, but also needs dynamic partition overwrite?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694671759


   cc @maryannxue @cloud-fan @JkSelf @viirya 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694696482






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-707124824


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34297/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-707235054






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r504588984



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1264,67 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalReader: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
+              noLocalReader = collect(plan) {
+                case exec: CustomShuffleReaderExec if exec.isLocalReader => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(funcName: String, qe: QueryExecution,
+          exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalReader)
+        noLocalReader = false
+      }
+
+      // Test DataSource v2
+      withTempPath { f =>
+        val path = f.getCanonicalPath
+        val format = classOf[V2Source].getName

Review comment:
       Using `NoopDataSource` will fail 
   
   ```
   requirement failed: The provided partitioning does not match of the table.
    - provided: j
    - table: 
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708871074






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706088546






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706060441


   **[Test build #129580 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129580/testReport)** for PR 29797 at commit [`b112133`](https://github.com/apache/spark/commit/b112133dfc671ca553a879994dba830a3c64b583).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698914035


   **[Test build #129111 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129111/testReport)** for PR 29797 at commit [`84134b0`](https://github.com/apache/spark/commit/84134b09ef5295818a32d9dc4612141fe93fa05c).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698907651






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r503331435



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1264,67 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalReader: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
+              noLocalReader = collect(plan) {
+                case exec: CustomShuffleReaderExec if exec.isLocalReader => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(funcName: String, qe: QueryExecution,
+          exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalReader)
+        noLocalReader = false
+      }
+
+      // Test DataSource v2
+      withTempPath { f =>
+        val path = f.getCanonicalPath
+        val format = classOf[V2Source].getName

Review comment:
       Can we just use `NoopDataSource`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698906945


   **[Test build #129110 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129110/testReport)** for PR 29797 at commit [`7e0d766`](https://github.com/apache/spark/commit/7e0d766b424cdcac27f4bb3b08e325886daf92b2).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698907651






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-730193899


   @maryannxue Great to know. I've been testing a draft fix which doesn't look good without optimizing the probe side of BHJ. I can help test your PR. Coalescing won't be applied to repartition if user specifies a repartition number.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-707109770


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34297/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
maryannxue commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-726266910


   @manuzhang This change seems incomplete. The problem here is not about the writer, but rather about the repartition Exchange. We should avoid letting LSR rule work on such shuffles, not just for the writer case.
   All you need to do is: 1) move LSR rule into `postStageCreationRules`; and 2) make the LSR rule match an Exchange first:
   ```
       plan match {
         case e: Exchange if canUseLocalShuffleReader(e.child) =>
           e.withNewChildren(Seq(createLocalReader(e.child)))
         case s: SparkPlan =>
           createProbeSideLocalReader(s)
       }
   ```
   So that it'll only modify the child node of an Exchange, which is the purpose of this rule in the first place.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-697146931


   > That will also disable local shuffle reader for broadcast join
   
   We can just disable local shuffle reader in the final stage, if the root node is data writing node. What do you think?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
maryannxue commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-728143402


   In LSR, we have a sanity check to make sure that the output partitioning requirement is not broken after this rule. For example, if there's a parent join sitting above the current BHJ for which we are trying to optimize the probe side to LSR, and that parent join has to take advantage of the output partitioning of the current BHJ. When applying the LSR rule, the check would fail, and we would back out of it.
   It's a similar situation here with repartition, only that repartition has been optimized away. So if we can introduce an idea like "required partitioning" (and in the future maybe even required sort order) of the query, then when applying the LSR rule we would know it could break the required partitioning. Hope it makes sense, @manuzhang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-706088560


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/34185/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-695965713






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694710622


   @cloud-fan 
   It's not ok but I haven't thought of a simple way to disable local shuffle reader on dynamic partition overwrite (bucket table overwrite as well). Any ideas ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
maryannxue commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-730451129


   @manuzhang 
   > Coalescing won't be applied to repartition if user specifies a repartition number.
   It does apply when the specified number happen to be the default partition number, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
manuzhang commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r500650742



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
##########
@@ -102,6 +103,14 @@ case class AdaptiveSparkPlanExec(
     OptimizeLocalShuffleReader(conf)
   )
 
+  @transient private val finalStageOptimizerRules: Seq[Rule[SparkPlan]] =
+    context.qe.sparkPlan match {
+      case _: DataWritingCommandExec =>

Review comment:
       Sure, is there a UT for DS v2 write ? I find only V1 is used for write no matter format I specify.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708688153


   **[Test build #129767 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129767/testReport)** for PR 29797 at commit [`d391269`](https://github.com/apache/spark/commit/d3912696b1ae53efaf4b154236d6b3cc046768d6).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694672314


   **[Test build #128858 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/128858/testReport)** for PR 29797 at commit [`93775e8`](https://github.com/apache/spark/commit/93775e83c7913bd4fea34714ca66409f1be52b78).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
manuzhang commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-726493708


   @maryannxue This could miss a LSR optimization on build side of leaf BHJ in multiple joins as follows.
   
   ```
   *(6) BroadcastHashJoin [b#24], [a#33], Inner, BuildLeft, false
   :- BroadcastQueryStage 7
   :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[3, int, false] as bigint)),false), [id=#390]
   :     +- CustomShuffleReader local
   :        +- ShuffleQueryStage 6
   :           +- Exchange hashpartitioning(b#24, 5), true, [id=#364]
   :              +- *(5) BroadcastHashJoin [key#13], [a#23], Inner, BuildLeft, false
   :                 :- BroadcastQueryStage 4
   :                 :  +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#264]
   :                 :     +- CustomShuffleReader local
   :                 :        +- ShuffleQueryStage 0
   :                 :           +- Exchange hashpartitioning(key#13, 5), true, [id=#172]
   :                 :              +- *(1) Filter (isnotnull(value#14) AND (cast(value#14 as int) = 1))
   :                 :                 +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).key AS key#13, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData, true])).value, true, false) AS value#14]
   :                 :                    +- Scan[obj#12]
   :                 +- ShuffleQueryStage 1
   :                    +- Exchange hashpartitioning(a#23, 5), true, [id=#179]
   :                       +- *(2) Filter (b#24 = 1)
   :                          +- *(2) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).a AS a#23, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData2, true])).b AS b#24]
   :                             +- Scan[obj#22]
   +- *(6) BroadcastHashJoin [n#93], [a#33], Inner, BuildRight, false
      :- CustomShuffleReader local
      :  +- ShuffleQueryStage 2
      :     +- Exchange hashpartitioning(n#93, 5), true, [id=#196]
      :        +- *(3) Filter (n#93 = 1)
      :           +- *(3) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).l, true, false) AS l#94]
      :              +- Scan[obj#92]
      +- BroadcastQueryStage 5
         +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [id=#310]
            +- CustomShuffleReader local
               +- ShuffleQueryStage 3
                  +- Exchange hashpartitioning(a#33, 5), true, [id=#210]
                     +- *(4) Filter (a#33 = 1)
                        +- *(4) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData3, true])).a AS a#33, unwrapoption(IntegerType, knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$TestData3, true])).b) AS b#34]
                           +- Scan[obj#32]
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on DataWritingCommand

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698944562






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-698907632


   **[Test build #129110 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129110/testReport)** for PR 29797 at commit [`7e0d766`](https://github.com/apache/spark/commit/7e0d766b424cdcac27f4bb3b08e325886daf92b2).
    * This patch **fails Scala style tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #29797:
URL: https://github.com/apache/spark/pull/29797#discussion_r504731011



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1262,52 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalReader: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
+              noLocalReader = collect(plan) {
+                case exec: CustomShuffleReaderExec if exec.isLocalReader => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(funcName: String, qe: QueryExecution,
+          exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalReader)
+        noLocalReader = false
+      }
+
+      // Test DataSource v2
+      withTempPath { f =>
+        val path = f.getCanonicalPath

Review comment:
       `path` is not needed for noop source. We can just call `save()`

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
##########
@@ -1258,4 +1262,52 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32932: Do not use local shuffle reader at final stage on write command") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString,
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      val data = for (
+        i <- 1L to 10L;
+        j <- 1L to 3L
+      ) yield (i, j)
+
+      val df = data.toDF("i", "j").repartition($"j")
+      var noLocalReader: Boolean = false
+      val listener = new QueryExecutionListener {
+        override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+          qe.executedPlan match {
+            case plan@(_: DataWritingCommandExec | _: V2TableWriteExec) =>
+              assert(plan.asInstanceOf[UnaryExecNode].child.isInstanceOf[AdaptiveSparkPlanExec])
+              noLocalReader = collect(plan) {
+                case exec: CustomShuffleReaderExec if exec.isLocalReader => exec
+              }.isEmpty
+            case _ => // ignore other events
+          }
+        }
+        override def onFailure(funcName: String, qe: QueryExecution,
+          exception: Exception): Unit = {}
+      }
+      spark.listenerManager.register(listener)
+
+      withTable("t") {
+        df.write.partitionBy("j").saveAsTable("t")
+        sparkContext.listenerBus.waitUntilEmpty()
+        assert(noLocalReader)
+        noLocalReader = false
+      }
+
+      // Test DataSource v2
+      withTempPath { f =>
+        val path = f.getCanonicalPath

Review comment:
       and we can remove `withTempPath`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-707124846






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-708787495


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34373/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] manuzhang edited a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader on RepartitionByExpression when coalescing disabled

Posted by GitBox <gi...@apache.org>.
manuzhang edited a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-694710622


   @cloud-fan 
   It's not ok but I haven't thought of a (simple) way to disable local shuffle reader on dynamic partition overwrite (bucket table overwrite as well). Any ideas ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue edited a comment on pull request #29797: [SPARK-32932][SQL] Do not use local shuffle reader at final stage on write command

Posted by GitBox <gi...@apache.org>.
maryannxue edited a comment on pull request #29797:
URL: https://github.com/apache/spark/pull/29797#issuecomment-726266910


   @manuzhang This change seems incomplete. The problem here is not about the writer, but rather about the repartition Exchange. We should avoid letting LSR rule work on such shuffles across the board, not just for the writer case.
   All you need to do is: 1) move LSR rule into `postStageCreationRules`; and 2) make the LSR rule match an Exchange first:
   ```
       plan match {
         case e: Exchange if canUseLocalShuffleReader(e.child) =>
           e.withNewChildren(Seq(createLocalReader(e.child)))
         case s: SparkPlan =>
           createProbeSideLocalReader(s)
       }
   ```
   So that it'll only modify the child node of an Exchange, which is the purpose of this rule in the first place.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org