You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/03/25 16:35:05 UTC

[GitHub] [spark] cloud-fan opened a new pull request #28022: [SPARK-31253][SQL] add metrics to shuffle reader

cloud-fan opened a new pull request #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022
 
 
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   Add SQL metrics to the shuffle reader, to replace the string description.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   to be more UI friendly
   
   ### Does this PR introduce any user-facing change?
   <!--
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   new test

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-604094125
 
 
   **[Test build #120366 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120366/testReport)** for PR 28022 at commit [`c4815f4`](https://github.com/apache/spark/commit/c4815f4e1c9725fe0ff345f3922644ad6e046e8c).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r401655420
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,109 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else if (hasCoalescedPartition && hasSkewedPartition) {
+      "coalesced and skewed"
+    } else if (hasCoalescedPartition) {
+      "coalesced"
+    } else if (hasSkewedPartition) {
+      "skewed"
+    } else {
+      ""
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
 
 Review comment:
   yes, true. Shall we make it a `val` not `def`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606081559
 
 
   **[Test build #120598 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120598/testReport)** for PR 28022 at commit [`0b92263`](https://github.com/apache/spark/commit/0b92263dca1c7cbb51cfacb0c90fdbb452cef877).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-603948314
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606465973
 
 
   **[Test build #120632 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120632/testReport)** for PR 28022 at commit [`a2e4ab3`](https://github.com/apache/spark/commit/a2e4ab37263d577b1211d8bd2fe56f8329cec357).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606082371
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606440980
 
 
   **[Test build #120630 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120630/testReport)** for PR 28022 at commit [`a2e4ab3`](https://github.com/apache/spark/commit/a2e4ab37263d577b1211d8bd2fe56f8329cec357).
    * This patch **fails due to an unknown error code, -9**.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398656081
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
 
 Review comment:
   nit: the inner `if` can be one up. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-603947604
 
 
   cc @maryannxue @JkSelf 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606082384
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25302/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606186804
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120598/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606466581
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25334/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r400297634
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
 
 Review comment:
   perf doesn't really matter here...

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-603947452
 
 
   **[Test build #120366 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120366/testReport)** for PR 28022 at commit [`c4815f4`](https://github.com/apache/spark/commit/c4815f4e1c9725fe0ff345f3922644ad6e046e8c).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606462973
 
 
   retest this please

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606466572
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-603948326
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25075/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-604306609
 
 
   It is only for the special shuffle reader SQL operator (`CustomShuffleReaderExec`) that are only used by adaptive execution.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398689742
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##########
 @@ -317,43 +317,11 @@ private object ShuffleStage {
   private def getMapStats(stage: ShuffleQueryStageExec): MapOutputStatistics = {
     assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
       " already be ready when executing OptimizeSkewedPartitions rule")
-    stage.resultOption.get.asInstanceOf[MapOutputStatistics]
+    stage.mapStats
 
 Review comment:
   You already have assert in `QueryStage.mapStats`. This method can be removed now. Although I'd prefer the other way around: not define `mapStats` in `QueryStage` but instead leave the logic in the callers.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
tgravescs commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-604054877
 
 
   is this only for adaptive scheduler mode? I've seen a bunch of go by that were specific to it but didn't mention it in the title or description, so can we tag or at least mention that in the description.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r400279525
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##########
 @@ -317,43 +317,11 @@ private object ShuffleStage {
   private def getMapStats(stage: ShuffleQueryStageExec): MapOutputStatistics = {
     assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" +
       " already be ready when executing OptimizeSkewedPartitions rule")
-    stage.resultOption.get.asInstanceOf[MapOutputStatistics]
+    stage.mapStats
 
 Review comment:
   `mapStats` is called in more than one place. I think we should avoid duplicating the code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606081559
 
 
   **[Test build #120598 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120598/testReport)** for PR 28022 at commit [`0b92263`](https://github.com/apache/spark/commit/0b92263dca1c7cbb51cfacb0c90fdbb452cef877).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-603948326
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25075/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606612137
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398685778
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##########
 @@ -120,7 +120,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
         } else {
           mapStartIndices(i + 1)
         }
-        PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex)
+        val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
 
 Review comment:
   Would it make sense to make this size part of the return value from `splitSizeListByTargetSize`?
   Or even make `splitSizeListByTargetSize` return partition specs directly, like the other utility func?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606610972
 
 
   **[Test build #120632 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120632/testReport)** for PR 28022 at commit [`a2e4ab3`](https://github.com/apache/spark/commit/a2e4ab37263d577b1211d8bd2fe56f8329cec357).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606436075
 
 
   **[Test build #120630 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120630/testReport)** for PR 28022 at commit [`a2e4ab3`](https://github.com/apache/spark/commit/a2e4ab37263d577b1211d8bd2fe56f8329cec357).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606441029
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120630/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606186448
 
 
   **[Test build #120598 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120598/testReport)** for PR 28022 at commit [`0b92263`](https://github.com/apache/spark/commit/0b92263dca1c7cbb51cfacb0c90fdbb452cef877).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606436702
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r401631876
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,109 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else if (hasCoalescedPartition && hasSkewedPartition) {
+      "coalesced and skewed"
+    } else if (hasCoalescedPartition) {
+      "coalesced"
+    } else if (hasSkewedPartition) {
+      "skewed"
+    } else {
+      ""
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
 
 Review comment:
   Isn't this an invariant we should enforce when we create the class?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r401656464
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,109 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else if (hasCoalescedPartition && hasSkewedPartition) {
+      "coalesced and skewed"
+    } else if (hasCoalescedPartition) {
+      "coalesced"
+    } else if (hasSkewedPartition) {
+      "skewed"
+    } else {
+      ""
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
+      true
+    } else {
+      false
+    }
+  }
+
+  private def shuffleStage = child match {
+    case stage: ShuffleQueryStageExec => Some(stage)
+    case _ => None
+  }
+
+  private lazy val partitionDataSizeMetrics = {
+    val maxSize = SQLMetrics.createSizeMetric(sparkContext, "maximum partition data size")
+    val minSize = SQLMetrics.createSizeMetric(sparkContext, "minimum partition data size")
+    val avgSize = SQLMetrics.createSizeMetric(sparkContext, "average partition data size")
+    val mapStats = shuffleStage.get.mapStats.bytesByPartitionId
+    val sizes = partitionSpecs.map {
 
 Review comment:
   it is, but probably the perf here doesn't matter too much.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-604095073
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606186804
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120598/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r400263747
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
+      true
+    } else {
+      false
+    }
+  }
+
+  private def shuffleStage = child match {
+    case stage: ShuffleQueryStageExec => Some(stage)
+    case _ => None
+  }
+
+  override lazy val metrics = {
+    if (shuffleStage.isDefined) {
 
 Review comment:
   yes. But `shuffleStage` is also used in `cachedShuffleRDD`, so we can't inline here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606465973
 
 
   **[Test build #120632 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120632/testReport)** for PR 28022 at commit [`a2e4ab3`](https://github.com/apache/spark/commit/a2e4ab37263d577b1211d8bd2fe56f8329cec357).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398666766
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
+      true
+    } else {
+      false
+    }
+  }
+
+  private def shuffleStage = child match {
+    case stage: ShuffleQueryStageExec => Some(stage)
+    case _ => None
+  }
+
+  override lazy val metrics = {
+    if (shuffleStage.isDefined) {
 
 Review comment:
   `metrics` should never be called if this is false, right?
   can we inline `shuffleStage` here and do assert instead?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
maryannxue commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-604518152
 
 
   also cc @hvanhovell 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-604095083
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120366/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r401633723
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,109 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else if (hasCoalescedPartition && hasSkewedPartition) {
+      "coalesced and skewed"
+    } else if (hasCoalescedPartition) {
+      "coalesced"
+    } else if (hasSkewedPartition) {
+      "skewed"
+    } else {
+      ""
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
+      true
+    } else {
+      false
+    }
+  }
+
+  private def shuffleStage = child match {
+    case stage: ShuffleQueryStageExec => Some(stage)
+    case _ => None
+  }
+
+  private lazy val partitionDataSizeMetrics = {
+    val maxSize = SQLMetrics.createSizeMetric(sparkContext, "maximum partition data size")
+    val minSize = SQLMetrics.createSizeMetric(sparkContext, "minimum partition data size")
+    val avgSize = SQLMetrics.createSizeMetric(sparkContext, "average partition data size")
+    val mapStats = shuffleStage.get.mapStats.bytesByPartitionId
+    val sizes = partitionSpecs.map {
 
 Review comment:
   It is probably faster if you compute min/max/avg on the fly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606612137
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606466581
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25334/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606436075
 
 
   **[Test build #120630 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120630/testReport)** for PR 28022 at commit [`a2e4ab3`](https://github.com/apache/spark/commit/a2e4ab37263d577b1211d8bd2fe56f8329cec357).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-603948314
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gatorsmile closed pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
gatorsmile closed pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398017307
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
 
 Review comment:
   Is this visible for testing or do you have some use cases in your mind?
   `hasCoalescedPartition`, `hasSkewedPartition`, `isLocalReader`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r400281351
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
 ##########
 @@ -120,7 +120,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
         } else {
           mapStartIndices(i + 1)
         }
-        PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex)
+        val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
 
 Review comment:
   I'd like to keep `splitSizeListByTargetSize` simple so that we can reuse it in more places, e.g. coalescing the shuffle partitions.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398665306
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
+      true
+    } else {
+      false
+    }
+  }
+
+  private def shuffleStage = child match {
+    case stage: ShuffleQueryStageExec => Some(stage)
+    case _ => None
+  }
+
+  override lazy val metrics = {
+    if (shuffleStage.isDefined) {
+      val numPartitions = SQLMetrics.createMetric(sparkContext, "number of partitions")
+      numPartitions.set(partitionSpecs.length)
+      Map("numPartitions" -> numPartitions) ++ {
+        if (isLocalReader) {
+          // We split the mapper partition evenly when creating local shuffle reader, so no
+          // data size info is available.
+          Map.empty
+        } else {
+          val maxSize = SQLMetrics.createSizeMetric(sparkContext, "max data size of partitions")
 
 Review comment:
   nit: maximum partition data size, minimum partition data size, average partition data size

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606186797
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-604095073
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606436710
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25332/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398665306
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
+      true
+    } else {
+      false
+    }
+  }
+
+  private def shuffleStage = child match {
+    case stage: ShuffleQueryStageExec => Some(stage)
+    case _ => None
+  }
+
+  override lazy val metrics = {
+    if (shuffleStage.isDefined) {
+      val numPartitions = SQLMetrics.createMetric(sparkContext, "number of partitions")
+      numPartitions.set(partitionSpecs.length)
+      Map("numPartitions" -> numPartitions) ++ {
+        if (isLocalReader) {
+          // We split the mapper partition evenly when creating local shuffle reader, so no
+          // data size info is available.
+          Map.empty
+        } else {
+          val maxSize = SQLMetrics.createSizeMetric(sparkContext, "max data size of partitions")
 
 Review comment:
   nit: max partition size, min partition size, avg partition size

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606612144
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120632/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606082371
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606082384
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25302/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606186797
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398677878
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
+      true
+    } else {
+      false
+    }
+  }
+
+  private def shuffleStage = child match {
+    case stage: ShuffleQueryStageExec => Some(stage)
+    case _ => None
+  }
+
+  override lazy val metrics = {
+    if (shuffleStage.isDefined) {
+      val numPartitions = SQLMetrics.createMetric(sparkContext, "number of partitions")
+      numPartitions.set(partitionSpecs.length)
+      Map("numPartitions" -> numPartitions) ++ {
+        if (isLocalReader) {
+          // We split the mapper partition evenly when creating local shuffle reader, so no
+          // data size info is available.
+          Map.empty
+        } else {
+          val maxSize = SQLMetrics.createSizeMetric(sparkContext, "max data size of partitions")
+          val minSize = SQLMetrics.createSizeMetric(sparkContext, "min data size of partitions")
+          val avgSize = SQLMetrics.createSizeMetric(sparkContext, "avg data size of partitions")
+          val mapStats = shuffleStage.get.mapStats.bytesByPartitionId
+          val sizes = partitionSpecs.map {
+            case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
+              startReducerIndex.until(endReducerIndex).map(mapStats(_)).sum
+            case p: PartialReducerPartitionSpec => p.dataSize
+            case p => throw new IllegalStateException("unexpected " + p)
+          }
+          maxSize.set(sizes.max)
+          minSize.set(sizes.min)
+          avgSize.set(sizes.sum / sizes.length)
+          Map(
+            "maxPartitionDataSize" -> maxSize,
+            "minPartitionDataSize" -> minSize,
+            "avgPartitionDataSize" -> avgSize)
+        }
+      } ++ {
+        if (hasSkewedPartition) {
+          val skewedPartitions = SQLMetrics.createMetric(
+            sparkContext, "number of skewed partitions")
+          var i = 0
 
 Review comment:
   Are we trying to do
   ```
       partitionSpecs.collect {
         case p: PartialReducerPartitionSpec => p.reducerIndex
       }.distinct.length
   ```
   here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606436710
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/25332/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gatorsmile commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
gatorsmile commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606845167
 
 
   Thanks! Merged to master.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
maryannxue commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398693046
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
+      true
+    } else {
+      false
+    }
+  }
+
+  private def shuffleStage = child match {
+    case stage: ShuffleQueryStageExec => Some(stage)
+    case _ => None
+  }
+
+  override lazy val metrics = {
 
 Review comment:
   nit: use variables like "partitionSizeMetrics", "skewPartitionMetrics" to make the flow a little clearer?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606436702
 
 
   Merged build finished. Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-603947452
 
 
   **[Test build #120366 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/120366/testReport)** for PR 28022 at commit [`c4815f4`](https://github.com/apache/spark/commit/c4815f4e1c9725fe0ff345f3922644ad6e046e8c).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606441014
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r398407158
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
 
 Review comment:
   It's also used in `stringArgs`.
   
   coalesced partition and skewed partition can co-exist, but local reader partition is exclusive.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606441014
 
 
   Merged build finished. Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r400219031
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
 
 Review comment:
   nit: `Iterator.single(desc)`, which can be more efficient.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] add metrics to shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-604095083
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120366/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606441029
 
 
   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120630/
   Test FAILed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#discussion_r400280123
 
 

 ##########
 File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
 ##########
 @@ -62,20 +61,113 @@ case class CustomShuffleReaderExec private(
     }
   }
 
-  override def stringArgs: Iterator[Any] = Iterator(description)
+  override def stringArgs: Iterator[Any] = {
+    val desc = if (isLocalReader) {
+      "local"
+    } else {
+      if (hasCoalescedPartition && hasSkewedPartition) {
+        "coalesced and skewed"
+      } else if (hasCoalescedPartition) {
+        "coalesced"
+      } else if (hasSkewedPartition) {
+        "skewed"
+      } else {
+        ""
+      }
+    }
+    Iterator(desc)
+  }
 
-  private var cachedShuffleRDD: RDD[InternalRow] = null
+  def hasCoalescedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec])
+  }
 
-  override protected def doExecute(): RDD[InternalRow] = {
-    if (cachedShuffleRDD == null) {
-      cachedShuffleRDD = child match {
-        case stage: ShuffleQueryStageExec =>
-          new ShuffledRowRDD(
-            stage.shuffle.shuffleDependency, stage.shuffle.readMetrics, partitionSpecs.toArray)
-        case _ =>
-          throw new IllegalStateException("operating on canonicalization plan")
+  def hasSkewedPartition: Boolean = {
+    partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec])
+  }
+
+  def isLocalReader: Boolean = {
+    if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
+      assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]))
+      true
+    } else {
+      false
+    }
+  }
+
+  private def shuffleStage = child match {
+    case stage: ShuffleQueryStageExec => Some(stage)
+    case _ => None
+  }
+
+  override lazy val metrics = {
+    if (shuffleStage.isDefined) {
+      val numPartitions = SQLMetrics.createMetric(sparkContext, "number of partitions")
+      numPartitions.set(partitionSpecs.length)
+      Map("numPartitions" -> numPartitions) ++ {
+        if (isLocalReader) {
+          // We split the mapper partition evenly when creating local shuffle reader, so no
+          // data size info is available.
+          Map.empty
+        } else {
+          val maxSize = SQLMetrics.createSizeMetric(sparkContext, "max data size of partitions")
+          val minSize = SQLMetrics.createSizeMetric(sparkContext, "min data size of partitions")
+          val avgSize = SQLMetrics.createSizeMetric(sparkContext, "avg data size of partitions")
+          val mapStats = shuffleStage.get.mapStats.bytesByPartitionId
+          val sizes = partitionSpecs.map {
+            case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
+              startReducerIndex.until(endReducerIndex).map(mapStats(_)).sum
+            case p: PartialReducerPartitionSpec => p.dataSize
+            case p => throw new IllegalStateException("unexpected " + p)
+          }
+          maxSize.set(sizes.max)
+          minSize.set(sizes.min)
+          avgSize.set(sizes.sum / sizes.length)
+          Map(
+            "maxPartitionDataSize" -> maxSize,
+            "minPartitionDataSize" -> minSize,
+            "avgPartitionDataSize" -> avgSize)
+        }
+      } ++ {
+        if (hasSkewedPartition) {
+          val skewedPartitions = SQLMetrics.createMetric(
+            sparkContext, "number of skewed partitions")
+          var i = 0
+          var currentReducerIndex = -1
+          while (i < partitionSpecs.length) {
+            partitionSpecs(i) match {
+              case p: PartialReducerPartitionSpec =>
+                if (p.reducerIndex != currentReducerIndex) {
+                  skewedPartitions.add(1)
+                  currentReducerIndex = p.reducerIndex
+                }
+              case _ => currentReducerIndex = -1
+            }
+            i += 1
+          }
 
 Review comment:
   Maybe you're considering perf here, but something like:
   
   ```
   partitionSpecs
     .filter(_.isInstanceOf[PartialReducerPartitionSpec])
     .map(_.reducerIndex).distinct.sum
   ```
   
   could be more readable?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on issue #28022: [SPARK-31253][SQL] Add metrics to AQE shuffle reader
URL: https://github.com/apache/spark/pull/28022#issuecomment-606612144
 
 
   Test PASSed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/120632/
   Test PASSed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org